[GitHub] spark issue #23262: [SPARK-26312][SQL]Converting converters in RDDConversion...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/23262 Good catch, LGTM cc @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23226: [SPARK-26286][TEST] Add MAXIMUM_PAGE_SIZE_BYTES exceptio...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/23226 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23239: [SPARK-26021][SQL][followup] only deal with NaN and -0.0...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/23239 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23239: [SPARK-26021][SQL][followup] only deal with NaN and -0.0...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/23239 The change looks fine. Do we already have tests for cases 2 and 4? We know test for case 3 is [here](https://github.com/apache/spark/pull/23043). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21777: [WIP][SPARK-24498][SQL] Add JDK compiler for runt...
Github user kiszk closed the pull request at: https://github.com/apache/spark/pull/21777 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23206: [SPARK-26249][SQL] Add ability to inject a rule in order...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/23206 cc @viirya @maropu --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23206: [SPARK-26249][SQL] Add ability to inject a rule i...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/23206#discussion_r238776051 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -235,10 +235,127 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) */ def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] = Nil + /** + * Seq of Optimizer rule to be added after or before a rule in a specific batch + */ + def optimizerRulesInOrder: Seq[RuleInOrder] = Nil + + /** + * Batches to add to the optimizer in a specific order with respect to a existing batch + * Seq of Tuple(existing batch name, order, Batch to add). + */ + def optimizerBatches: Seq[(String, Order.Value, Batch)] = Nil + + /** + * Return the batch after removing rules that need to be excluded + */ + private def handleExcludedRules(batch: Batch, excludedRules: Seq[String]): Seq[Batch] = { +// Excluded rules +val filteredRules = batch.rules.filter { rule => + val exclude = excludedRules.contains(rule.ruleName) + if (exclude) { +logInfo(s"Optimization rule '${rule.ruleName}' is excluded from the optimizer.") + } + !exclude +} +if (batch.rules == filteredRules) { + Seq(batch) +} else if (filteredRules.nonEmpty) { + Seq(Batch(batch.name, batch.strategy, filteredRules: _*)) +} else { + logInfo(s"Optimization batch '${batch.name}' is excluded from the optimizer " + +s"as all enclosed rules have been excluded.") + Seq.empty +} + } + + /** + * Add the customized rules and batch in order to the optimizer batches. + * excludedRules - rules that will be excluded --- End diff -- nit: `* @param excludedRules ...` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23190: [MINOR][SQL]throw SparkOutOfMemoryError intead of SparkE...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/23190 LGTM excepts two comments --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23190: [MINOR][SQL]throw SparkOutOfMemoryError intead of...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/23190#discussion_r238123212 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala --- @@ -24,7 +24,8 @@ import com.esotericsoftware.kryo.io.{Input, Output} import org.apache.spark.{SparkConf, SparkEnv, SparkException} import org.apache.spark.internal.config.MEMORY_OFFHEAP_ENABLED -import org.apache.spark.memory.{MemoryConsumer, StaticMemoryManager, TaskMemoryManager} +import org.apache.spark.memory.{MemoryConsumer, SparkOutOfMemoryError, --- End diff -- Is it better to use `_`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23190: [MINOR][SQL]throw SparkOutOfMemoryError intead of SparkE...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/23190 Is this follow-up of #23084? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23190: [MINOR][SQL]throw SparkOutOfMemoryError intead of SparkE...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/23190 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23199: [SPARK-26245][SQL] Add Float literal
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/23199 cc @maropu @gatorsmile --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23199: [SPARK-26245][SQL] Add Float literal
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/23199 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23146: [SPARK-26173] [MLlib] Prior regularization for Lo...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/23146#discussion_r238104839 --- Diff: mllib/src/main/scala/org/apache/spark/ml/optim/loss/DifferentiableRegularization.scala --- @@ -82,7 +82,72 @@ private[ml] class L2Regularization( } (0.5 * sum * regParam, Vectors.dense(gradient)) case _: SparseVector => -throw new IllegalArgumentException("Sparse coefficients are not currently supported.") +throw new IllegalArgumentException( + "Sparse coefficients are not currently supported.") +} + } +} + + +/** + * Implements regularization for Maximum A Posteriori (MAP) optimization + * based on prior means (coefficients) and precisions. + * + * @param priorMean Prior coefficients (multivariate mean). + * @param priorPrecisions Prior precisions. + * @param regParam The magnitude of the regularization. + * @param shouldApply A function (Int => Boolean) indicating whether a given index should have + *regularization applied to it. Usually we don't apply regularization to + *the intercept. + * @param applyFeaturesStd Option for a function which maps coefficient index (column major) to the + * feature standard deviation. Since we always standardize the data during + * training, if `standardization` is false, we have to reverse + * standardization by penalizing each component differently by this param. + * If `standardization` is true, this should be `None`. + */ +private[ml] class PriorRegularization( +priorMean: Array[Double], +priorPrecisions: Array[Double], +override val regParam: Double, +shouldApply: Int => Boolean, +applyFeaturesStd: Option[Int => Double]) +extends DifferentiableRegularization[Vector] { + + override def calculate(coefficients: Vector): (Double, Vector) = { +coefficients match { + case dv: DenseVector => +var sum = 0.0 +val gradient = new Array[Double](dv.size) +dv.values.indices.filter(shouldApply).foreach { j => + val coef = coefficients(j) + val priorCoef = priorMean(j) + val priorPrecision = priorPrecisions(j) + applyFeaturesStd match { +case Some(getStd) => + // If `standardization` is false, we still standardize the data + // to improve the rate of convergence; as a result, we have to + // perform this reverse standardization by penalizing each component + // differently to get effectively the same objective function when + // the training dataset is not standardized. + val std = getStd(j) + if (std != 0.0) { +val temp = (coef - priorCoef) / (std * std) +sum += (coef - priorCoef) * temp * priorPrecision +gradient(j) = regParam * priorPrecision * temp + } else { +0.0 --- End diff -- Who consumes `0.0`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23194: [MINOR][SQL] Combine the same codes in test cases
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/23194#discussion_r238062387 --- Diff: core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala --- @@ -37,7 +37,7 @@ import org.apache.spark.internal.Logging * tasks was performed by the ShuffleMemoryManager. * * @param lock a [[MemoryManager]] instance to synchronize on - * @param memoryMode the type of memory tracked by this pool (on- or off-heap) + * @param memoryMode the type of memory tracked by this pool (on-heap or off-heap) --- End diff -- Is this change related to this PR? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23194: [MINOR][SQL] Combine the same codes in test cases
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/23194#discussion_r238062396 --- Diff: core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala --- @@ -28,7 +28,7 @@ import org.apache.spark.storage.memory.MemoryStore * (caching). * * @param lock a [[MemoryManager]] instance to synchronize on - * @param memoryMode the type of memory tracked by this pool (on- or off-heap) + * @param memoryMode the type of memory tracked by this pool (on-heap or off-heap) --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23194: [MINOR][SQL] Combine the same codes in test cases
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/23194 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23194: [MINOR][SQL] Combine the same codes in test cases
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/23194 Good catch --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23177: [SPARK-26212][Build][test-maven] Upgrade maven version t...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/23177 Sure, updated. Thanks for letting know them. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23154: [SPARK-26195][SQL] Correct exception messages in some cl...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/23154 LGTM cc @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23177: [SPARK-26212][Build][test-maven] Upgrade maven version t...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/23177 I thought that it is automatically done by `build/mvn`, as you pointed out [before](https://github.com/apache/spark/pull/21905#issuecomment-408678119). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23177: [SPARK-26212][Build][test-maven] Upgrade maven version t...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/23177 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23176: [SPARK-26211][SQL] Fix InSet for binary, and struct and ...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/23176 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23176: [SPARK-26211][SQL] Fix InSet for binary, and struct and ...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/23176 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23177: [SPARK-26212][Build][test-maven] Upgrade maven ve...
GitHub user kiszk opened a pull request: https://github.com/apache/spark/pull/23177 [SPARK-26212][Build][test-maven] Upgrade maven version to 3.6.0 ## What changes were proposed in this pull request? This PR updates maven version from 3.5.4 to 3.6.0. The release note of the 3.6.0 is [here](https://maven.apache.org/docs/3.6.0/release-notes.html). From [the release note of the 3.6.0](https://maven.apache.org/docs/3.6.0/release-notes.html), the followings are new features: 1. There had been issues related to the project discoverytime which has been increased in previous version which influenced some of our users. 1. The output in the reactor summary has been improved. 1. There was an issue related to the classpath ordering. ## How was this patch tested? Existing tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/kiszk/spark SPARK-26212 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23177.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23177 commit a5587b5f8468eaf946b89a851e0949231445a4af Author: Kazuaki Ishizaki Date: 2018-11-29T08:14:09Z initial commit --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/23124 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23154: [SPARK-26195][SQL] Correct exception messages in ...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/23154#discussion_r236919935 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala --- @@ -258,7 +258,7 @@ case class GeneratorOuter(child: Generator) extends UnaryExpression with Generat throw new UnsupportedOperationException(s"Cannot evaluate expression: $this") final override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = -throw new UnsupportedOperationException(s"Cannot evaluate expression: $this") +throw new UnsupportedOperationException(s"Cannot generate code expression: $this") --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23154: [SPARK-26195][SQL] Correct exception messages in ...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/23154#discussion_r236919395 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala --- @@ -204,10 +204,10 @@ case class UnresolvedGenerator(name: FunctionIdentifier, children: Seq[Expressio throw new UnsupportedOperationException(s"Cannot evaluate expression: $this") override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = -throw new UnsupportedOperationException(s"Cannot evaluate expression: $this") +throw new UnsupportedOperationException(s"Cannot generate code expression: $this") --- End diff -- Is it better to use `generate code for expression` or others rather than `generate code expression`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23154: [SPARK-26195][SQL] Correct exception messages in some cl...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/23154 @lcqzte10192193 I am sorry for my misunderstanding The original code in `VectorizedRleValuesReader.java` was correct. Could you please revert you change? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23151: [SPARK-26180][CORE][TEST] Add a withCreateTempDir...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/23151#discussion_r236912228 --- Diff: core/src/test/scala/org/apache/spark/SparkFunSuite.scala --- @@ -105,5 +105,16 @@ abstract class SparkFunSuite logInfo(s"\n\n= FINISHED $shortSuiteName: '$testName' =\n") } } - + /** + * Creates a temporary directory, which is then passed to `f` and will be deleted after `f` + * returns. + * + * @todo Probably this method should be moved to a more general place + */ + protected def withCreateTempDir(f: File => Unit): Unit = { +val dir = Utils.createTempDir() --- End diff -- Is it better to call `.getCanonicalFile`, too? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23151: [SPARK-26180][CORE][TEST] Add a withCreateTempDir...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/23151#discussion_r236912182 --- Diff: core/src/test/scala/org/apache/spark/SparkFunSuite.scala --- @@ -105,5 +105,16 @@ abstract class SparkFunSuite logInfo(s"\n\n= FINISHED $shortSuiteName: '$testName' =\n") } } - + /** + * Creates a temporary directory, which is then passed to `f` and will be deleted after `f` + * returns. + * + * @todo Probably this method should be moved to a more general place + */ + protected def withCreateTempDir(f: File => Unit): Unit = { --- End diff -- Is there any reason not to use `withTempDir` as a function name like other modules? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23151: [SPARK-26180][CORE][TEST] Add a withCreateTempDir functi...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/23151 Good catch --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23154: [SQL] Correct two exception message in UnresolvedGenerat...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/23154 Good catch. I believe other files (e.g. `VectorizedRleValuesReader.java`, `Expression.scala`, and `generators.scala` also have the similar problem. Can this PR address them? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/23124#discussion_r236376102 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala --- @@ -89,7 +89,7 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext { val msg1 = intercept[Exception] { df5.select(map_from_arrays($"k", $"v")).collect }.getMessage -assert(msg1.contains("Cannot use null as map key!")) +assert(msg1.contains("Cannot use null as map key")) --- End diff -- Message at Line 98 is also changed now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23141: [SPARK-26021][SQL][followup] add test for special floati...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/23141 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/23124#discussion_r236284636 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala --- @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util + +import scala.collection.mutable + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.types.{AtomicType, CalendarIntervalType, DataType, MapType} + +/** + * A builder of [[ArrayBasedMapData]], which fails if a null map key is detected, and removes + * duplicated map keys w.r.t. the last wins policy. + */ +class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Serializable { + assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map cannot be/contain map") + + private lazy val keyToIndex = keyType match { +case _: AtomicType | _: CalendarIntervalType => mutable.HashMap.empty[Any, Int] +case _ => + // for complex types, use interpreted ordering to be able to compare unsafe data with safe + // data, e.g. UnsafeRow vs GenericInternalRow. + mutable.TreeMap.empty[Any, Int](TypeUtils.getInterpretedOrdering(keyType)) + } + + // TODO: specialize it + private lazy val keys = mutable.ArrayBuffer.empty[Any] + private lazy val values = mutable.ArrayBuffer.empty[Any] + + private lazy val keyGetter = InternalRow.getAccessor(keyType) + private lazy val valueGetter = InternalRow.getAccessor(valueType) + + def reset(): Unit = { +keyToIndex.clear() +keys.clear() +values.clear() + } + + def put(key: Any, value: Any): Unit = { +if (key == null) { + throw new RuntimeException("Cannot use null as map key.") +} + +val maybeExistingIdx = keyToIndex.get(key) +if (maybeExistingIdx.isDefined) { + // Overwrite the previous value, as the policy is last wins. + values(maybeExistingIdx.get) = value +} else { + keyToIndex.put(key, values.length) + keys.append(key) + values.append(value) +} + } + + // write a 2-field row, the first field is key and the second field is value. + def put(entry: InternalRow): Unit = { +if (entry.isNullAt(0)) { + throw new RuntimeException("Cannot use null as map key.") +} +put(keyGetter(entry, 0), valueGetter(entry, 1)) + } + + def putAll(keyArray: Array[Any], valueArray: Array[Any]): Unit = { +if (keyArray.length != valueArray.length) { + throw new RuntimeException( +"The key array and value array of MapData must have the same length.") +} + +var i = 0 +while (i < keyArray.length) { + put(keyArray(i), valueArray(i)) + i += 1 +} + } + + def putAll(keyArray: ArrayData, valueArray: ArrayData): Unit = { +if (keyArray.numElements() != valueArray.numElements()) { + throw new RuntimeException( +"The key array and value array of MapData must have the same length.") +} + +var i = 0 +while (i < keyArray.numElements()) { + put(keyGetter(keyArray, i), valueGetter(valueArray, i)) + i += 1 +} + } + + def build(): ArrayBasedMapData = { +new ArrayBasedMapData(new GenericArrayData(keys.toArray), new GenericArrayData(values.toArray)) + } + + def from(keyArray: ArrayData, valueArray: ArrayData): ArrayBasedMapData = { +assert(keyToIndex.isEmpty, "'from' can only be called with a fresh GenericMapBuilder.") +putAll(keyArray, valueArray) --- End diff -- Ah, you are right. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23137: [SPARK-26169] Create DataFrameSetOperationsSuite
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/23137 LGTM, pending Jenkins --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23135: [SPARK-26168][SQL] Update the code comments in Expressio...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/23135 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23135: [SPARK-26168][SQL] Update the code comments in Ex...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/23135#discussion_r236104602 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -43,9 +43,24 @@ import org.apache.spark.sql.types._ * There are a few important traits: * * - [[Nondeterministic]]: an expression that is not deterministic. + * - [[Stateful]]: an expression that contains mutable state. For example, MonotonicallyIncreasingID + * and Rand. A stateful expression is always non-deterministic. * - [[Unevaluable]]: an expression that is not supposed to be evaluated. * - [[CodegenFallback]]: an expression that does not have code gen implemented and falls back to *interpreted mode. + * - [[NullIntolerant]]: an expression that is null intolerant (i.e. any null input will result in + * null output). + * - [[NonSQLExpression]]: a common base trait for the expressions that doesn't have SQL --- End diff -- nit: `doesn't` -> `do not` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23135: [SPARK-26168][SQL] Update the code comments in Ex...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/23135#discussion_r236103936 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -43,9 +43,24 @@ import org.apache.spark.sql.types._ * There are a few important traits: * * - [[Nondeterministic]]: an expression that is not deterministic. + * - [[Stateful]]: an expression that contains mutable state. For example, MonotonicallyIncreasingID + * and Rand. A stateful expression is always non-deterministic. * - [[Unevaluable]]: an expression that is not supposed to be evaluated. * - [[CodegenFallback]]: an expression that does not have code gen implemented and falls back to *interpreted mode. + * - [[NullIntolerant]]: an expression that is null intolerant (i.e. any null input will result in + * null output). + * - [[NonSQLExpression]]: a common base trait for the expressions that doesn't have SQL + * expressions like representation. For example, `ScalaUDF`, `ScalaUDAF`, + * and object `MapObjects` and `Invoke`. + * - [[UserDefinedExpression]]: a common base trait for user-defined functions, including + * UDF/UDAF/UDTF. + * - [[HigherOrderFunction]]: a common base trait for higher order functions that take one or more + *(lambda) functions and applies these to some objects. The function + *produces a number of variables which can be consumed by some lambda + *function. --- End diff -- nit: `function` -> `functions` ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22512: [SPARK-25498][SQL] InterpretedMutableProjection should h...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/22512 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23022: [SPARK-26038] Decimal toScalaBigInt/toJavaBigInteger for...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/23022 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23102: [SPARK-26137][CORE] Use Java system property "fil...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/23102#discussion_r235975268 --- Diff: core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala --- @@ -61,11 +62,12 @@ private[deploy] object DependencyUtils extends Logging { hadoopConf: Configuration, secMgr: SecurityManager): String = { val targetDir = Utils.createTempDir() +val fileSeparator = Pattern.quote(System.getProperty("file.separator")) Option(jars) .map { resolveGlobPaths(_, hadoopConf) .split(",") - .filterNot(_.contains(userJar.split("/").last)) + .filterNot(_.contains(userJar.split(fileSeparator).last)) --- End diff -- Beyond the original purpose of this PR, is it better to move `userJar.split(fileSeparator).last` before line 66? This is because `userJar` is not changed in `map { ... }`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23102: [SPARK-26137][CORE] Use Java system property "file.separ...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/23102 @MaxGekk This PR may change a separator for `userJar` that has `\` on Windows. `resolveGlobPaths` is not applied to `userJar`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/23124#discussion_r235952965 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala --- @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util + +import scala.collection.mutable + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.types.{AtomicType, CalendarIntervalType, DataType, MapType} + +/** + * A builder of [[ArrayBasedMapData]], which fails if a null map key is detected, and removes + * duplicated map keys w.r.t. the last wins policy. + */ +class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Serializable { + assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map cannot be/contain map") + + private lazy val keyToIndex = keyType match { +case _: AtomicType | _: CalendarIntervalType => mutable.HashMap.empty[Any, Int] +case _ => + // for complex types, use interpreted ordering to be able to compare unsafe data with safe + // data, e.g. UnsafeRow vs GenericInternalRow. + mutable.TreeMap.empty[Any, Int](TypeUtils.getInterpretedOrdering(keyType)) + } + + // TODO: specialize it + private lazy val keys = mutable.ArrayBuffer.empty[Any] + private lazy val values = mutable.ArrayBuffer.empty[Any] + + private lazy val keyGetter = InternalRow.getAccessor(keyType) + private lazy val valueGetter = InternalRow.getAccessor(valueType) + + def reset(): Unit = { +keyToIndex.clear() +keys.clear() +values.clear() + } + + def put(key: Any, value: Any): Unit = { +if (key == null) { + throw new RuntimeException("Cannot use null as map key.") +} + +val maybeExistingIdx = keyToIndex.get(key) +if (maybeExistingIdx.isDefined) { + // Overwrite the previous value, as the policy is last wins. + values(maybeExistingIdx.get) = value +} else { + keyToIndex.put(key, values.length) + keys.append(key) + values.append(value) +} + } + + // write a 2-field row, the first field is key and the second field is value. + def put(entry: InternalRow): Unit = { +if (entry.isNullAt(0)) { + throw new RuntimeException("Cannot use null as map key.") +} +put(keyGetter(entry, 0), valueGetter(entry, 1)) + } + + def putAll(keyArray: Array[Any], valueArray: Array[Any]): Unit = { +if (keyArray.length != valueArray.length) { + throw new RuntimeException( +"The key array and value array of MapData must have the same length.") +} + +var i = 0 +while (i < keyArray.length) { + put(keyArray(i), valueArray(i)) + i += 1 +} + } + + def putAll(keyArray: ArrayData, valueArray: ArrayData): Unit = { +if (keyArray.numElements() != valueArray.numElements()) { + throw new RuntimeException( +"The key array and value array of MapData must have the same length.") +} + +var i = 0 +while (i < keyArray.numElements()) { + put(keyGetter(keyArray, i), valueGetter(valueArray, i)) + i += 1 +} + } + + def build(): ArrayBasedMapData = { +new ArrayBasedMapData(new GenericArrayData(keys.toArray), new GenericArrayData(values.toArray)) --- End diff -- Is it better to call reset() after calling new ArrayBasedMapData to reduce memory consumption in Java heap? At caller side, ArrayBasedMapBuilder is not released. Therefore, until reset() will be called next time, each ArrayBasedMapBuilder keeps unused data in keys, values, and keyToIndex. They consumes Java heap unexpectedly. ---
[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/23124#discussion_r235950666 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala --- @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util + +import scala.collection.mutable + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.types.{AtomicType, CalendarIntervalType, DataType, MapType} + +/** + * A builder of [[ArrayBasedMapData]], which fails if a null map key is detected, and removes + * duplicated map keys w.r.t. the last wins policy. + */ +class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Serializable { + assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map cannot be/contain map") + + private lazy val keyToIndex = keyType match { +case _: AtomicType | _: CalendarIntervalType => mutable.HashMap.empty[Any, Int] +case _ => + // for complex types, use interpreted ordering to be able to compare unsafe data with safe + // data, e.g. UnsafeRow vs GenericInternalRow. + mutable.TreeMap.empty[Any, Int](TypeUtils.getInterpretedOrdering(keyType)) + } + + // TODO: specialize it + private lazy val keys = mutable.ArrayBuffer.empty[Any] + private lazy val values = mutable.ArrayBuffer.empty[Any] + + private lazy val keyGetter = InternalRow.getAccessor(keyType) + private lazy val valueGetter = InternalRow.getAccessor(valueType) + + def reset(): Unit = { +keyToIndex.clear() +keys.clear() +values.clear() + } + + def put(key: Any, value: Any): Unit = { +if (key == null) { + throw new RuntimeException("Cannot use null as map key.") +} + +val maybeExistingIdx = keyToIndex.get(key) +if (maybeExistingIdx.isDefined) { + // Overwrite the previous value, as the policy is last wins. + values(maybeExistingIdx.get) = value +} else { + keyToIndex.put(key, values.length) + keys.append(key) + values.append(value) +} + } + + // write a 2-field row, the first field is key and the second field is value. + def put(entry: InternalRow): Unit = { +if (entry.isNullAt(0)) { + throw new RuntimeException("Cannot use null as map key.") +} +put(keyGetter(entry, 0), valueGetter(entry, 1)) + } + + def putAll(keyArray: Array[Any], valueArray: Array[Any]): Unit = { +if (keyArray.length != valueArray.length) { + throw new RuntimeException( +"The key array and value array of MapData must have the same length.") +} + +var i = 0 +while (i < keyArray.length) { + put(keyArray(i), valueArray(i)) + i += 1 +} + } + + def putAll(keyArray: ArrayData, valueArray: ArrayData): Unit = { +if (keyArray.numElements() != valueArray.numElements()) { + throw new RuntimeException( +"The key array and value array of MapData must have the same length.") +} + +var i = 0 +while (i < keyArray.numElements()) { + put(keyGetter(keyArray, i), valueGetter(valueArray, i)) + i += 1 +} + } + + def build(): ArrayBasedMapData = { +new ArrayBasedMapData(new GenericArrayData(keys.toArray), new GenericArrayData(values.toArray)) + } + + def from(keyArray: ArrayData, valueArray: ArrayData): ArrayBasedMapData = { +assert(keyToIndex.isEmpty, "'from' can only be called with a fresh GenericMapBuilder.") +putAll(keyArray, valueArray) +if (keyToIndex.size == keyArray.numElements()) { + // If there is no duplicated map keys, creates the MapData with the input key and va
[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/23124#discussion_r235950148 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala --- @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util + +import scala.collection.mutable + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.types.{AtomicType, CalendarIntervalType, DataType, MapType} + +/** + * A builder of [[ArrayBasedMapData]], which fails if a null map key is detected, and removes + * duplicated map keys w.r.t. the last wins policy. + */ +class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Serializable { + assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map cannot be/contain map") + + private lazy val keyToIndex = keyType match { +case _: AtomicType | _: CalendarIntervalType => mutable.HashMap.empty[Any, Int] +case _ => + // for complex types, use interpreted ordering to be able to compare unsafe data with safe + // data, e.g. UnsafeRow vs GenericInternalRow. + mutable.TreeMap.empty[Any, Int](TypeUtils.getInterpretedOrdering(keyType)) + } + + // TODO: specialize it + private lazy val keys = mutable.ArrayBuffer.empty[Any] + private lazy val values = mutable.ArrayBuffer.empty[Any] + + private lazy val keyGetter = InternalRow.getAccessor(keyType) + private lazy val valueGetter = InternalRow.getAccessor(valueType) + + def reset(): Unit = { +keyToIndex.clear() +keys.clear() +values.clear() + } + + def put(key: Any, value: Any): Unit = { +if (key == null) { + throw new RuntimeException("Cannot use null as map key.") +} + +val maybeExistingIdx = keyToIndex.get(key) +if (maybeExistingIdx.isDefined) { + // Overwrite the previous value, as the policy is last wins. + values(maybeExistingIdx.get) = value +} else { + keyToIndex.put(key, values.length) + keys.append(key) + values.append(value) +} + } + + // write a 2-field row, the first field is key and the second field is value. + def put(entry: InternalRow): Unit = { +if (entry.isNullAt(0)) { + throw new RuntimeException("Cannot use null as map key.") +} +put(keyGetter(entry, 0), valueGetter(entry, 1)) + } + + def putAll(keyArray: Array[Any], valueArray: Array[Any]): Unit = { +if (keyArray.length != valueArray.length) { + throw new RuntimeException( +"The key array and value array of MapData must have the same length.") +} + +var i = 0 +while (i < keyArray.length) { + put(keyArray(i), valueArray(i)) + i += 1 +} + } + + def putAll(keyArray: ArrayData, valueArray: ArrayData): Unit = { +if (keyArray.numElements() != valueArray.numElements()) { + throw new RuntimeException( +"The key array and value array of MapData must have the same length.") +} + +var i = 0 +while (i < keyArray.numElements()) { + put(keyGetter(keyArray, i), valueGetter(valueArray, i)) + i += 1 +} + } + + def build(): ArrayBasedMapData = { +new ArrayBasedMapData(new GenericArrayData(keys.toArray), new GenericArrayData(values.toArray)) + } --- End diff -- Is it better to call `reset()` after calling `new ArrayBasedMapData` to reduce memory consumption? At caller side, `ArrayBasedMapBuilder` is not released. Therefore, until reset() will be called next time, each `ArrayBasedMapBuilder` keeps unused data in `keys`, `values`, and `keyToIndex`. They consumes Java heap unexpectedly. ---
[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/23124#discussion_r235947044 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala --- @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util + +import scala.collection.mutable + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.types.{AtomicType, CalendarIntervalType, DataType, MapType} + +/** + * A builder of [[ArrayBasedMapData]], which fails if a null map key is detected, and removes + * duplicated map keys w.r.t. the last wins policy. + */ +class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Serializable { + assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map cannot be/contain map") + + private lazy val keyToIndex = keyType match { +case _: AtomicType | _: CalendarIntervalType => mutable.HashMap.empty[Any, Int] +case _ => + // for complex types, use interpreted ordering to be able to compare unsafe data with safe + // data, e.g. UnsafeRow vs GenericInternalRow. + mutable.TreeMap.empty[Any, Int](TypeUtils.getInterpretedOrdering(keyType)) + } + + // TODO: specialize it + private lazy val keys = mutable.ArrayBuffer.empty[Any] + private lazy val values = mutable.ArrayBuffer.empty[Any] + + private lazy val keyGetter = InternalRow.getAccessor(keyType) + private lazy val valueGetter = InternalRow.getAccessor(valueType) + + def reset(): Unit = { +keyToIndex.clear() +keys.clear() +values.clear() + } + + def put(key: Any, value: Any): Unit = { +if (key == null) { + throw new RuntimeException("Cannot use null as map key.") +} + +val maybeExistingIdx = keyToIndex.get(key) +if (maybeExistingIdx.isDefined) { + // Overwrite the previous value, as the policy is last wins. + values(maybeExistingIdx.get) = value +} else { + keyToIndex.put(key, values.length) + keys.append(key) + values.append(value) +} + } + + // write a 2-field row, the first field is key and the second field is value. + def put(entry: InternalRow): Unit = { +if (entry.isNullAt(0)) { + throw new RuntimeException("Cannot use null as map key.") +} +put(keyGetter(entry, 0), valueGetter(entry, 1)) + } + + def putAll(keyArray: Array[Any], valueArray: Array[Any]): Unit = { +if (keyArray.length != valueArray.length) { + throw new RuntimeException( +"The key array and value array of MapData must have the same length.") +} + +var i = 0 +while (i < keyArray.length) { + put(keyArray(i), valueArray(i)) + i += 1 +} + } + + def putAll(keyArray: ArrayData, valueArray: ArrayData): Unit = { +if (keyArray.numElements() != valueArray.numElements()) { + throw new RuntimeException( +"The key array and value array of MapData must have the same length.") +} + +var i = 0 +while (i < keyArray.numElements()) { + put(keyGetter(keyArray, i), valueGetter(valueArray, i)) + i += 1 +} + } + + def build(): ArrayBasedMapData = { +new ArrayBasedMapData(new GenericArrayData(keys.toArray), new GenericArrayData(values.toArray)) + } + + def from(keyArray: ArrayData, valueArray: ArrayData): ArrayBasedMapData = { +assert(keyToIndex.isEmpty, "'from' can only be called with a fresh GenericMapBuilder.") +putAll(keyArray, valueArray) --- End diff -- Can we call `new ArrayBasedMapData(keyArray, valueArray)` withou
[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/23124#discussion_r235943290 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -751,171 +739,46 @@ case class MapFromEntries(child: Expression) extends UnaryExpression { s"${child.dataType.catalogString} type. $prettyName accepts only arrays of pair structs.") } + private lazy val mapBuilder = new ArrayBasedMapBuilder(dataType.keyType, dataType.valueType) + override protected def nullSafeEval(input: Any): Any = { -val arrayData = input.asInstanceOf[ArrayData] -val numEntries = arrayData.numElements() +val entries = input.asInstanceOf[ArrayData] +val numEntries = entries.numElements() var i = 0 -if(nullEntries) { +if (nullEntries) { while (i < numEntries) { -if (arrayData.isNullAt(i)) return null +if (entries.isNullAt(i)) return null i += 1 } } -val keyArray = new Array[AnyRef](numEntries) -val valueArray = new Array[AnyRef](numEntries) + +mapBuilder.reset() i = 0 while (i < numEntries) { - val entry = arrayData.getStruct(i, 2) - val key = entry.get(0, dataType.keyType) - if (key == null) { -throw new RuntimeException("The first field from a struct (key) can't be null.") - } - keyArray.update(i, key) - val value = entry.get(1, dataType.valueType) - valueArray.update(i, value) + mapBuilder.put(entries.getStruct(i, 2)) i += 1 } -ArrayBasedMapData(keyArray, valueArray) +mapBuilder.build() } override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { nullSafeCodeGen(ctx, ev, c => { val numEntries = ctx.freshName("numEntries") - val isKeyPrimitive = CodeGenerator.isPrimitiveType(dataType.keyType) - val isValuePrimitive = CodeGenerator.isPrimitiveType(dataType.valueType) - val code = if (isKeyPrimitive && isValuePrimitive) { -genCodeForPrimitiveElements(ctx, c, ev.value, numEntries) --- End diff -- This change allow us to focus on optimizing `ArrayBasedMapBuilder`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23101: [SPARK-26134][CORE] Upgrading Hadoop to 2.7.4 to fix jav...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/23101 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23102: [SPARK-26137][CORE] Use Java system property "file.separ...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/23102 Would it be possible to update the PR description based on the template? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23102: [SPARK-26137][CORE] Use Java system property "file.separ...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/23102 Thank you for submitting a PR to fix hard coded character. Is this only one that we have to fix regarding this hard coded character? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23101: [SPARK-26134][CORE] Upgrading Hadoop to 2.7.4 to fix jav...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/23101 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23101: [SPARK-26134][CORE] Upgrading Hadoop to 2.7.4 to fix jav...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/23101 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23084: [SPARK-26117][CORE][SQL]use SparkOutOfMemoryError instea...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/23084 I think that we need to take care of `UnsafeExternalSorterSuite.testGetIterator`, too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23043: [SPARK-26021][SQL] replace minus zero with zero in Unsaf...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/23043 Do we need to consider `GenerateSafeProjection`, too? In other words, if the generated code or runtime does not use data in `Unsafe`, this `+0.0/-0.0` problem may still exist. Am I correct? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23043: [SPARK-26021][SQL] replace minus zero with zero in Unsaf...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/23043 Is it better to update this PR title now? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23043: [SPARK-26021][SQL] replace minus zero with zero in Unsaf...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/23043 @srowen #21794 is what I thought. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22779: [SPARK-25786][CORE]If the ByteBuffer.hasArray is ...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/22779#discussion_r234204540 --- Diff: core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala --- @@ -497,6 +498,17 @@ class KryoSerializerAutoResetDisabledSuite extends SparkFunSuite with SharedSpar deserializationStream.close() assert(serInstance.deserialize[Any](helloHello) === ((hello, hello))) } + + test("ByteBuffer.array -- UnsupportedOperationException") { --- End diff -- It would be good to add a prefix like "SPARK-25786: ...". --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23039: [SPARK-26066][SQL] Move truncatedString to sql/ca...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/23039#discussion_r234202827 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1594,6 +1594,13 @@ object SQLConf { "WHERE, which does not follow SQL standard.") .booleanConf .createWithDefault(false) + + val MAX_TO_STRING_FIELDS = buildConf("spark.sql.debug.maxToStringFields") +.doc("Maximum number of fields of sequence-like entries that can be converted to strings " + --- End diff -- nit: `that` is not necessary if I am correct. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23043: [SPARK-26021][SQL] replace minus zero with zero i...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/23043#discussion_r233951725 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala --- @@ -56,17 +56,32 @@ case class BoundReference(ordinal: Int, dataType: DataType, nullable: Boolean) val javaType = JavaCode.javaType(dataType) val value = CodeGenerator.getValue(ctx.INPUT_ROW, dataType, ordinal.toString) if (nullable) { -ev.copy(code = +var codeBlock = code""" |boolean ${ev.isNull} = ${ctx.INPUT_ROW}.isNullAt($ordinal); |$javaType ${ev.value} = ${ev.isNull} ? | ${CodeGenerator.defaultValue(dataType)} : ($value); - """.stripMargin) + """.stripMargin +codeBlock = codeBlock + genReplaceMinusZeroWithZeroCode(javaType.codeString, ev.value) +ev.copy(code = codeBlock) } else { -ev.copy(code = code"$javaType ${ev.value} = $value;", isNull = FalseLiteral) +var codeBlock = code"$javaType ${ev.value} = $value;" +codeBlock = codeBlock + genReplaceMinusZeroWithZeroCode(javaType.codeString, ev.value) +ev.copy(code = codeBlock, isNull = FalseLiteral) } } } + + private def genReplaceMinusZeroWithZeroCode(javaType: String, value: String): Block = { +val code = s"\nif ($value == -0.0%c) $value = 0.0%c;" +var formattedCode = "" --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23043: [SPARK-26021][SQL] replace minus zero with zero i...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/23043#discussion_r233951670 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala --- @@ -56,17 +56,32 @@ case class BoundReference(ordinal: Int, dataType: DataType, nullable: Boolean) val javaType = JavaCode.javaType(dataType) val value = CodeGenerator.getValue(ctx.INPUT_ROW, dataType, ordinal.toString) if (nullable) { -ev.copy(code = +var codeBlock = code""" |boolean ${ev.isNull} = ${ctx.INPUT_ROW}.isNullAt($ordinal); |$javaType ${ev.value} = ${ev.isNull} ? | ${CodeGenerator.defaultValue(dataType)} : ($value); - """.stripMargin) + """.stripMargin +codeBlock = codeBlock + genReplaceMinusZeroWithZeroCode(javaType.codeString, ev.value) +ev.copy(code = codeBlock) } else { -ev.copy(code = code"$javaType ${ev.value} = $value;", isNull = FalseLiteral) +var codeBlock = code"$javaType ${ev.value} = $value;" --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23043: [SPARK-26021][SQL] replace minus zero with zero in Unsaf...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/23043 IIUC, we discussed handling `+0.0` and `-0.0` before in another PR. @srowen do you remember the previous discussion? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23044: [SPARK-26073][SQL][FOLLOW-UP] remove invalid comment as ...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/23044 LGTM, pending Jenkins --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22976: [SPARK-25974][SQL]Optimizes Generates bytecode for order...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/22976 gentle ping @rednaxelafx --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22976: [SPARK-25974][SQL]Optimizes Generates bytecode for order...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/22976 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22976: [SPARK-25974][SQL]Optimizes Generates bytecode for order...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/22976 cc @cloud-fan @mgaido91 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22993: [SPARK-24421][BUILD][CORE] Accessing sun.misc.Cle...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/22993#discussion_r232488912 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java --- @@ -67,6 +67,59 @@ unaligned = _unaligned; } + // Access fields and constructors once and store them, for performance: + + private static final Constructor DBB_CONSTRUCTOR; + private static final Field DBB_CLEANER_FIELD; + static { +try { + Class cls = Class.forName("java.nio.DirectByteBuffer"); + Constructor constructor = cls.getDeclaredConstructor(Long.TYPE, Integer.TYPE); + constructor.setAccessible(true); + Field cleanerField = cls.getDeclaredField("cleaner"); + cleanerField.setAccessible(true); + DBB_CONSTRUCTOR = constructor; + DBB_CLEANER_FIELD = cleanerField; +} catch (ClassNotFoundException | NoSuchMethodException | NoSuchFieldException e) { + throw new IllegalStateException(e); +} + } + + private static final Method CLEANER_CREATE_METHOD; + static { +// The implementation of Cleaner changed from JDK 8 to 9 +int majorVersion = Integer.parseInt(System.getProperty("java.version").split("\\.")[0]); --- End diff -- From Java 9, here is a [new definition](https://docs.oracle.com/javase/9/migrate/toc.htm#JSMIG-GUID-3A71ECEF-5FC5-46FE-9BA9-88CBFCE828CB). I confirmed it can work for OpenJDK, OpenJ9, and IBM JDK 8 by running the following code ``` public class Version { public static void main(String[] args){ System.out.println("jave.specification.version=" + System.getProperty("java.specification.version")); System.out.println("jave.version=" + System.getProperty("java.version")); System.out.println("jave.version.split(\".\")[0]=" + System.getProperty("java.version").split("\\.")[0]); } } ``` OpenJDK ``` $ ../OpenJDK-8/java -version java version "1.8.0_162" Java(TM) SE Runtime Environment (build 1.8.0_162-b12) Java HotSpot(TM) 64-Bit Server VM (build 25.162-b12, mixed mode) $ ../OpenJDK-8/java Version jave.specification.version=1.8 jave.version=1.8.0_162 jave.version.split(".")[0]=1 $ ../OpenJDK-9/java -version openjdk version "9" OpenJDK Runtime Environment (build 9+181) OpenJDK 64-Bit Server VM (build 9+181, mixed mode) $ ../OpenJDK-9/java Version jave.specification.version=9 jave.version=9 jave.version.split(".")[0]=9 $ ../OpenJDK-11/java -version openjdk version "11.0.1" 2018-10-16 OpenJDK Runtime Environment 18.9 (build 11.0.1+13) OpenJDK 64-Bit Server VM 18.9 (build 11.0.1+13, mixed mode) $ ../OpenJDK-11/java Version jave.specification.version=11 jave.version=11.0.1 jave.version.split(".")[0]=11 ``` OpenJ9 ``` $ ../OpenJ9-8/java -version openjdk version "1.8.0_192" OpenJDK Runtime Environment (build 1.8.0_192-b12) Eclipse OpenJ9 VM (build openj9-0.11.0, JRE 1.8.0 Windows 10 amd64-64-Bit Compressed References 20181019_105 (JIT enabled, AOT enabled) OpenJ9 - 090ff9dc OMR - ea548a66 JCL - 51609250b5 based on jdk8u192-b12) $ ../OpenJ9-8/java Version jave.specification.version=1.8 jave.version=1.8.0_192 jave.version.split(".")[0]=1 $ ../OpenJ9-9/java -version openjdk version "9.0.4-adoptopenjdk" OpenJDK Runtime Environment (build 9.0.4-adoptopenjdk+12) Eclipse OpenJ9 VM (build openj9-0.9.0, JRE 9 Windows 8.1 amd64-64-Bit Compressed References 20180814_161 (JIT enabled, AOT enabled) OpenJ9 - 24e53631 OMR - fad6bf6e JCL - feec4d2ae based on jdk-9.0.4+12) $ ../OpenJ9-9/java Version jave.specification.version=9 jave.version=9.0.4-adoptopenjdk jave.version.split(".")[0]=9 $ ../OpenJ9-11/java -version openjdk version "11.0.1" 2018-10-16 OpenJDK Runtime Environment AdoptOpenJDK (build 11.0.1+13) Eclipse OpenJ9 VM AdoptOpenJDK (build openj9-0.11.0, JRE 11 Windows 10 amd64-64-Bit Compressed References 20181020_83 (JIT enabled, AOT enabled) OpenJ9 - 090ff9dc OMR - ea548a66 JCL - f62696f378 based on jdk-11.0.1+13) $ ../OpenJ9-11/java Version jave.specification.version=11 jave.version=11.0.1 jave.version.split(".")[0]=11 ``` IBM JDK ``` $ ../IBMJDK-8/java -version java version "1.8.0" Java(TM) SE Runtime Environment (build pwa6480-20150129_02) IBM J9 VM (build 2.8, JRE 1.8.0 Windows 8.1 amd64-64 Compressed References
[GitHub] spark issue #23005: [SPARK-26005] [SQL] Upgrade ANTRL from 4.7 to 4.7.1
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/23005 Files under `dev/deps/` should be updated, too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232453690 --- Diff: R/pkg/R/SQLContext.R --- @@ -147,6 +147,55 @@ getDefaultSqlSource <- function() { l[["spark.sql.sources.default"]] } +writeToTempFileInArrow <- function(rdf, numPartitions) { + # R API in Arrow is not yet released. CRAN requires to add the package in requireNamespace + # at DESCRIPTION. Later, CRAN checks if the package is available or not. Therefore, it works + # around by avoiding direct requireNamespace. + requireNamespace1 <- requireNamespace + if (requireNamespace1("arrow", quietly = TRUE)) { +record_batch <- get("record_batch", envir = asNamespace("arrow"), inherits = FALSE) +record_batch_stream_writer <- get( + "record_batch_stream_writer", envir = asNamespace("arrow"), inherits = FALSE) +file_output_stream <- get( + "file_output_stream", envir = asNamespace("arrow"), inherits = FALSE) +write_record_batch <- get( + "write_record_batch", envir = asNamespace("arrow"), inherits = FALSE) + +# Currently arrow requires withr; otherwise, write APIs don't work. --- End diff -- nit: `arrow` -> `Arrow` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22998: [SPARK-26001][SQL]Reduce memory copy when writing decima...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/22998 I have two questions. 1. Is this PR already tested with `"SPARK-25538: zero-out all bits for decimals"`? 2. How does this PR achieve performance improvement? This PR may introduce some complication. We would like to know the trade-off between performance and ease of understanding. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22976: [SPARK-25974][SQL]Optimizes Generates bytecode fo...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/22976#discussion_r232443266 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala --- @@ -68,57 +68,50 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR genComparisons(ctx, ordering) } + /** + * Creates the variables for ordering based on the given order. + */ + private def createOrderKeys( +ctx: CodegenContext, +row: String, +ordering: Seq[SortOrder]): Seq[ExprCode] = { +ctx.INPUT_ROW = row +// to use INPUT_ROW we must make sure currentVars is null +ctx.currentVars = null +ordering.map(_.child.genCode(ctx)) + } + /** * Generates the code for ordering based on the given order. */ def genComparisons(ctx: CodegenContext, ordering: Seq[SortOrder]): String = { val oldInputRow = ctx.INPUT_ROW val oldCurrentVars = ctx.currentVars -val inputRow = "i" -ctx.INPUT_ROW = inputRow -// to use INPUT_ROW we must make sure currentVars is null -ctx.currentVars = null - -val comparisons = ordering.map { order => - val eval = order.child.genCode(ctx) - val asc = order.isAscending - val isNullA = ctx.freshName("isNullA") - val primitiveA = ctx.freshName("primitiveA") - val isNullB = ctx.freshName("isNullB") - val primitiveB = ctx.freshName("primitiveB") +val rowAKeys = createOrderKeys(ctx, "a", ordering) +val rowBKeys = createOrderKeys(ctx, "b", ordering) +val comparisons = rowAKeys.zip(rowBKeys).zipWithIndex.map { case ((l, r), i) => + val dt = ordering(i).child.dataType + val asc = ordering(i).isAscending + val nullOrdering = ordering(i).nullOrdering s""" - ${ctx.INPUT_ROW} = a; - boolean $isNullA; - ${CodeGenerator.javaType(order.child.dataType)} $primitiveA; - { -${eval.code} -$isNullA = ${eval.isNull}; -$primitiveA = ${eval.value}; - } - ${ctx.INPUT_ROW} = b; - boolean $isNullB; - ${CodeGenerator.javaType(order.child.dataType)} $primitiveB; - { -${eval.code} -$isNullB = ${eval.isNull}; -$primitiveB = ${eval.value}; - } - if ($isNullA && $isNullB) { + ${l.code} --- End diff -- Would you update this to use | and .stripMargin? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22976: [SPARK-25974][SQL]Optimizes Generates bytecode fo...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/22976#discussion_r232443230 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala --- @@ -133,7 +126,6 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR returnType = "int", makeSplitFunction = { body => s""" --- End diff -- Would you update this to use `|` and `.stripMargin`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22976: [SPARK-25974][SQL]Optimizes Generates bytecode fo...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/22976#discussion_r232443205 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala --- @@ -154,7 +146,6 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR // make sure INPUT_ROW is declared even if splitExpressions // returns an inlined block s""" --- End diff -- Can we use just `code`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22985: [SPARK-25510][SQL][TEST][FOLLOW-UP] Remove BenchmarkWith...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/22985 LGTM, pending Jenkins --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22976: [SPARK-25974][SQL]Optimizes Generates bytecode for order...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/22976 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22976: [SPARK-25974][SQL]Optimizes Generates bytecode fo...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/22976#discussion_r231886019 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala --- @@ -68,57 +68,51 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR genComparisons(ctx, ordering) } + /** + * Creates the variables for ordering based on the given order. + */ + private def createOrderKeys( +ctx: CodegenContext, +row: String, +ordering: Seq[SortOrder]): Seq[ExprCode] = { +ctx.INPUT_ROW = row +ctx.currentVars = null +ordering.map(_.child.genCode(ctx)) + } + /** * Generates the code for ordering based on the given order. */ def genComparisons(ctx: CodegenContext, ordering: Seq[SortOrder]): String = { val oldInputRow = ctx.INPUT_ROW val oldCurrentVars = ctx.currentVars -val inputRow = "i" -ctx.INPUT_ROW = inputRow // to use INPUT_ROW we must make sure currentVars is null ctx.currentVars = null - -val comparisons = ordering.map { order => - val eval = order.child.genCode(ctx) - val asc = order.isAscending - val isNullA = ctx.freshName("isNullA") - val primitiveA = ctx.freshName("primitiveA") - val isNullB = ctx.freshName("isNullB") - val primitiveB = ctx.freshName("primitiveB") +val rowAKeys = createOrderKeys(ctx, "a", ordering) +val rowBKeys = createOrderKeys(ctx, "b", ordering) +val comparisons = rowAKeys.zip(rowBKeys).zipWithIndex.map { case ((l, r), i) => + val dt = ordering(i).child.dataType + val asc = ordering(i).isAscending + val nullOrdering = ordering(i).nullOrdering s""" - ${ctx.INPUT_ROW} = a; - boolean $isNullA; - ${CodeGenerator.javaType(order.child.dataType)} $primitiveA; - { -${eval.code} -$isNullA = ${eval.isNull}; -$primitiveA = ${eval.value}; - } - ${ctx.INPUT_ROW} = b; - boolean $isNullB; - ${CodeGenerator.javaType(order.child.dataType)} $primitiveB; - { -${eval.code} -$isNullB = ${eval.isNull}; -$primitiveB = ${eval.value}; - } - if ($isNullA && $isNullB) { + ${l.code} + ${r.code} + if (${l.isNull} && ${r.isNull}) { // Nothing - } else if ($isNullA) { + } else if (${l.isNull}) { return ${ - order.nullOrdering match { -case NullsFirst => "-1" -case NullsLast => "1" - }}; - } else if ($isNullB) { +nullOrdering match { --- End diff -- nit: indentation problem --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22976: [SPARK-25974][SQL]Optimizes Generates bytecode fo...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/22976#discussion_r231885902 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala --- @@ -68,57 +68,51 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR genComparisons(ctx, ordering) } + /** + * Creates the variables for ordering based on the given order. + */ + private def createOrderKeys( +ctx: CodegenContext, +row: String, +ordering: Seq[SortOrder]): Seq[ExprCode] = { +ctx.INPUT_ROW = row +ctx.currentVars = null +ordering.map(_.child.genCode(ctx)) + } + /** * Generates the code for ordering based on the given order. */ def genComparisons(ctx: CodegenContext, ordering: Seq[SortOrder]): String = { val oldInputRow = ctx.INPUT_ROW val oldCurrentVars = ctx.currentVars -val inputRow = "i" -ctx.INPUT_ROW = inputRow // to use INPUT_ROW we must make sure currentVars is null ctx.currentVars = null --- End diff -- Now, can we remove this line? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22976: [SPARK-25974][SQL]Optimizes Generates bytecode fo...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/22976#discussion_r231886071 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala --- @@ -68,57 +68,51 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR genComparisons(ctx, ordering) } + /** + * Creates the variables for ordering based on the given order. + */ + private def createOrderKeys( +ctx: CodegenContext, +row: String, +ordering: Seq[SortOrder]): Seq[ExprCode] = { +ctx.INPUT_ROW = row +ctx.currentVars = null +ordering.map(_.child.genCode(ctx)) + } + /** * Generates the code for ordering based on the given order. */ def genComparisons(ctx: CodegenContext, ordering: Seq[SortOrder]): String = { val oldInputRow = ctx.INPUT_ROW val oldCurrentVars = ctx.currentVars -val inputRow = "i" -ctx.INPUT_ROW = inputRow // to use INPUT_ROW we must make sure currentVars is null ctx.currentVars = null - -val comparisons = ordering.map { order => - val eval = order.child.genCode(ctx) - val asc = order.isAscending - val isNullA = ctx.freshName("isNullA") - val primitiveA = ctx.freshName("primitiveA") - val isNullB = ctx.freshName("isNullB") - val primitiveB = ctx.freshName("primitiveB") +val rowAKeys = createOrderKeys(ctx, "a", ordering) +val rowBKeys = createOrderKeys(ctx, "b", ordering) +val comparisons = rowAKeys.zip(rowBKeys).zipWithIndex.map { case ((l, r), i) => + val dt = ordering(i).child.dataType + val asc = ordering(i).isAscending + val nullOrdering = ordering(i).nullOrdering s""" - ${ctx.INPUT_ROW} = a; - boolean $isNullA; - ${CodeGenerator.javaType(order.child.dataType)} $primitiveA; - { -${eval.code} -$isNullA = ${eval.isNull}; -$primitiveA = ${eval.value}; - } - ${ctx.INPUT_ROW} = b; - boolean $isNullB; - ${CodeGenerator.javaType(order.child.dataType)} $primitiveB; - { -${eval.code} -$isNullB = ${eval.isNull}; -$primitiveB = ${eval.value}; - } - if ($isNullA && $isNullB) { + ${l.code} + ${r.code} + if (${l.isNull} && ${r.isNull}) { // Nothing - } else if ($isNullA) { + } else if (${l.isNull}) { return ${ - order.nullOrdering match { -case NullsFirst => "-1" -case NullsLast => "1" - }}; - } else if ($isNullB) { +nullOrdering match { + case NullsFirst => "-1" + case NullsLast => "1" +}}; + } else if (${r.isNull}) { return ${ - order.nullOrdering match { -case NullsFirst => "1" -case NullsLast => "-1" - }}; +nullOrdering match { --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22976: [SPARK-25974][SQL]Optimizes Generates bytecode for order...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/22976 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22818: [SPARK-25904][CORE] Allocate arrays smaller than Int.Max...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/22818 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22754: [SPARK-25776][CORE]The disk write buffer size must be gr...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/22754 Thanks! merging to master --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22818: [SPARK-25904][CORE] Allocate arrays smaller than Int.Max...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/22818 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22818: [SPARK-25904][CORE] Allocate arrays smaller than Int.Max...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/22818 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22754: [SPARK-25776][CORE]The disk write buffer size must be gr...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/22754 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22754: [SPARK-25776][CORE]The disk write buffer size must be gr...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/22754 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22898: [SPARK-25746][SQL][followup] do not add unnecessary If e...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/22898 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22847: [SPARK-25850][SQL] Make the split threshold for t...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/22847#discussion_r229577559 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -812,6 +812,18 @@ object SQLConf { .intConf .createWithDefault(65535) + val CODEGEN_METHOD_SPLIT_THRESHOLD = buildConf("spark.sql.codegen.methodSplitThreshold") +.internal() +.doc("The threshold of source-code splitting in the codegen. When the number of characters " + + "in a single JAVA function (without comment) exceeds the threshold, the function will be " + --- End diff -- nit: `JAVA` -> `Java` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22847: [SPARK-25850][SQL] Make the split threshold for t...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/22847#discussion_r229577345 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -812,6 +812,17 @@ object SQLConf { .intConf .createWithDefault(65535) + val CODEGEN_METHOD_SPLIT_THRESHOLD = buildConf("spark.sql.codegen.methodSplitThreshold") +.internal() +.doc("The threshold of source code length without comment of a single Java function by " + + "codegen to be split. When the generated Java function source code exceeds this threshold" + + ", it will be split into multiple small functions. We can't know how many bytecode will " + + "be generated, so use the code length as metric. A function's bytecode should not go " + + "beyond 8KB, otherwise it will not be JITted; it also should not be too small, otherwise " + + "there will be many function calls.") +.intConf --- End diff -- 1000 is conservative. But, there is no recommendation since the bytecode size depends on the content (e.g. `0`'s byte code length is 1. `9`'s byte code lengh is 2). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22891: SPARK-25881
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/22891 Thank you for your contribution. Could you please write appropriate title and descriptions based on http://spark.apache.org/contributing.html ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22881: [SPARK-25855][CORE] Don't use erasure coding for ...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/22881#discussion_r229155491 --- Diff: docs/configuration.md --- @@ -761,6 +761,17 @@ Apart from these, the following properties are also available, and may be useful Compression will use spark.io.compression.codec. + + spark.eventLog.allowErasureCoding + false + +Whether to allow event logs to use erasure coding, or turn erasure coding off, regardless of +filesystem defaults. On HDFS, erasure coded files will not update as quickly as regular +replicated files, so they application updates will take longer to appear in the History Server. +Note that even if this is true, spark will still not force the file to erasure coding, it will --- End diff -- nit: `to erasure coding` -> `to use erasure coding`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22881: [SPARK-25855][CORE] Don't use erasure coding for ...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/22881#discussion_r229154733 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala --- @@ -471,4 +473,42 @@ object SparkHadoopUtil { hadoopConf.set(key.substring("spark.hadoop.".length), value) } } + + + lazy val builderReflection: Option[(Class[_], Method, Method)] = Try { +val cls = Utils.classForName( + "org.apache.hadoop.hdfs.DistributedFileSystem$HdfsDataOutputStreamBuilder") +(cls, cls.getMethod("replicate"), cls.getMethod("build")) + }.toOption + + // scalastyle:off line.size.limit + /** + * Create a path that uses replication instead of erasure coding, regardless of the default + * configuration in hdfs for the given path. This can be helpful as hdfs ec doesn't support --- End diff -- nit: `ec` -> `erasure coding` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22877: [MINOR][SQL] Avoid hardcoded configuration keys i...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/22877#discussion_r229148363 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -408,15 +408,16 @@ object SQLConf { val PARQUET_FILTER_PUSHDOWN_DATE_ENABLED = buildConf("spark.sql.parquet.filterPushdown.date") .doc("If true, enables Parquet filter push-down optimization for Date. " + - "This configuration only has an effect when 'spark.sql.parquet.filterPushdown' is enabled.") + s"This configuration only has an effect when '${PARQUET_FILTER_PUSHDOWN_ENABLED.key}' is " + --- End diff -- Got it, thanks --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22879: [SPARK-25872][SQL][TEST] Add an optimizer tracker for TP...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/22879 cc @maropu --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22877: [MINOR][SQL] Avoid hardcoded configuration keys i...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/22877#discussion_r229034778 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -408,15 +408,16 @@ object SQLConf { val PARQUET_FILTER_PUSHDOWN_DATE_ENABLED = buildConf("spark.sql.parquet.filterPushdown.date") .doc("If true, enables Parquet filter push-down optimization for Date. " + - "This configuration only has an effect when 'spark.sql.parquet.filterPushdown' is enabled.") + s"This configuration only has an effect when '${PARQUET_FILTER_PUSHDOWN_ENABLED.key}' is " + --- End diff -- nit: Can we apply the same policy to `spark.sql.parquet.compression.codec` at L397? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22755: [SPARK-25755][SQL][Test] Supplementation of non-CodeGen ...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/22755 Is it better to apply this util method to others (e.g. `DataFrameRangeSuite.scala` and `DataFrameAggregateSuite.scala`)? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22818: [SPARK-25827][CORE] Allocate arrays smaller than Int.Max...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/22818 Since this PR is not a blocker for 2.4, I think that it would be good to address these issues as possible. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22754: [SPARK-25776][CORE]The disk write buffer size mus...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/22754#discussion_r228738630 --- Diff: core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java --- @@ -42,7 +42,9 @@ private final SparkConf conf = new SparkConf(); - /** The buffer size to use when writing the sorted records to an on-disk file */ + /** The buffer size to use when writing the sorted records to an on-disk file, and --- End diff -- nit: For a multiple-line comment, a starting line `/**` does not have a text. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19601: [SPARK-22383][SQL] Generate code to directly get value o...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/19601 Sure, let me close this --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19601: [SPARK-22383][SQL] Generate code to directly get ...
Github user kiszk closed the pull request at: https://github.com/apache/spark/pull/19601 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org