[GitHub] spark pull request #19858: [SPARK-22489][DOC][FOLLOWUP] Update broadcast beh...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19858 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19813#discussion_r154576646 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/ExpressionCodegen.scala --- @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.codegen + +import scala.collection.mutable + +import org.apache.spark.sql.catalyst.expressions._ + +/** + * Defines APIs used in expression code generation. + */ +object ExpressionCodegen { + + /** + * Given an expression, returns the all necessary parameters to evaluate it, so the generated + * code of this expression can be split in a function. + * The 1st string in returned tuple is the parameter strings used to call the function. + * The 2nd string in returned tuple is the parameter strings used to declare the function. + * + * Returns `None` if it can't produce valid parameters. + * + * Params to include: + * 1. Evaluated columns referred by this, children or deferred expressions. + * 2. Rows referred by this, children or deferred expressions. + * 3. Eliminated subexpressions referred bu children expressions. + */ + def getExpressionInputParams( + ctx: CodegenContext, + expr: Expression): Option[(Seq[String], Seq[String])] = { +val (inputAttrs, inputVars) = getInputVarsForChildren(ctx, expr) +val paramsFromColumns = prepareFunctionParams(ctx, inputAttrs, inputVars) + +val subExprs = getSubExprInChildren(ctx, expr) +val subExprCodes = getSubExprCodes(ctx, subExprs) +val paramsFromSubExprs = prepareFunctionParams(ctx, subExprs, subExprCodes) --- End diff -- do we support subexpression elimination when splitting codes to methods in the non-whole-stage-codegen path? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19869#discussion_r154576368 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala --- @@ -573,94 +574,84 @@ case class HashAggregateExec( enableTwoLevelHashMap(ctx) } else { sqlContext.getConf("spark.sql.codegen.aggregate.map.vectorized.enable", null) match { -case "true" => logWarning("Two level hashmap is disabled but vectorized hashmap is " + - "enabled.") -case null | "" | "false" => None +case "true" => + logWarning("Two level hashmap is disabled but vectorized hashmap is enabled.") +case _ => } } -fastHashMapTerm = ctx.freshName("fastHashMap") -val fastHashMapClassName = ctx.freshName("FastHashMap") -val fastHashMapGenerator = - if (isVectorizedHashMapEnabled) { -new VectorizedHashMapGenerator(ctx, aggregateExpressions, - fastHashMapClassName, groupingKeySchema, bufferSchema) - } else { -new RowBasedHashMapGenerator(ctx, aggregateExpressions, - fastHashMapClassName, groupingKeySchema, bufferSchema) - } val thisPlan = ctx.addReferenceObj("plan", this) -// Create a name for iterator from vectorized HashMap +// Create a name for the iterator from the fast hash map. val iterTermForFastHashMap = ctx.freshName("fastHashMapIter") if (isFastHashMapEnabled) { + // Generates the fast hash map class and creates the fash hash map term. + fastHashMapTerm = ctx.freshName("fastHashMap") + val fastHashMapClassName = ctx.freshName("FastHashMap") if (isVectorizedHashMapEnabled) { +val generatedMap = new VectorizedHashMapGenerator(ctx, aggregateExpressions, + fastHashMapClassName, groupingKeySchema, bufferSchema).generate() +ctx.addInnerClass(generatedMap) + ctx.addMutableState(fastHashMapClassName, fastHashMapTerm, s"$fastHashMapTerm = new $fastHashMapClassName();") ctx.addMutableState( - "java.util.Iterator", + classOf[java.util.Iterator[ColumnarRow]].getName, --- End diff -- Is this as same as before? ```scala scala> classOf[java.util.Iterator[Int]].getName res2: String = java.util.Iterator ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19858: [SPARK-22489][DOC][FOLLOWUP] Update broadcast behavior c...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19858 Thanks! Merged to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19858: [SPARK-22489][DOC][FOLLOWUP] Update broadcast behavior c...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19858 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19813#discussion_r154575849 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/ExpressionCodegen.scala --- @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.codegen + +import scala.collection.mutable + +import org.apache.spark.sql.catalyst.expressions._ + +/** + * Defines APIs used in expression code generation. + */ +object ExpressionCodegen { + + /** + * Given an expression, returns the all necessary parameters to evaluate it, so the generated + * code of this expression can be split in a function. + * The 1st string in returned tuple is the parameter strings used to call the function. + * The 2nd string in returned tuple is the parameter strings used to declare the function. + * + * Returns `None` if it can't produce valid parameters. + * + * Params to include: + * 1. Evaluated columns referred by this, children or deferred expressions. + * 2. Rows referred by this, children or deferred expressions. + * 3. Eliminated subexpressions referred bu children expressions. + */ + def getExpressionInputParams( + ctx: CodegenContext, + expr: Expression): Option[(Seq[String], Seq[String])] = { +val (inputAttrs, inputVars) = getInputVarsForChildren(ctx, expr) +val paramsFromColumns = prepareFunctionParams(ctx, inputAttrs, inputVars) + +val subExprs = getSubExprInChildren(ctx, expr) +val subExprCodes = getSubExprCodes(ctx, subExprs) +val paramsFromSubExprs = prepareFunctionParams(ctx, subExprs, subExprCodes) + +val inputRows = ctx.INPUT_ROW +: getInputRowsForChildren(ctx, expr) +val paramsFromRows = inputRows.distinct.filter(_ != null).map { row => + (row, s"InternalRow $row") +} + +val paramsLength = getParamLength(ctx, inputAttrs, subExprs) + paramsFromRows.length +// Maximum allowed parameter number for Java's method descriptor. +if (paramsLength > 255) { + None +} else { + val allParams = (paramsFromRows ++ paramsFromColumns ++ paramsFromSubExprs).unzip + val callParams = allParams._1.distinct + val declParams = allParams._2.distinct + Some((callParams, declParams)) +} + } + + /** + * Returns the eliminated subexpressions in the children expressions. + */ + def getSubExprInChildren(ctx: CodegenContext, expr: Expression): Seq[Expression] = { +expr.children.flatMap { child => + child.collect { +case e if ctx.subExprEliminationExprs.contains(e) => e + } +}.distinct + } + + /** + * A small helper function to return `ExprCode`s that represent subexpressions. + */ + def getSubExprCodes(ctx: CodegenContext, subExprs: Seq[Expression]): Seq[ExprCode] = { +subExprs.map { subExpr => + val stat = ctx.subExprEliminationExprs(subExpr) + ExprCode(code = "", value = stat.value, isNull = stat.isNull) +} + } + + /** + * Retrieves previous input rows referred by children and deferred expressions. + */ + def getInputRowsForChildren(ctx: CodegenContext, expr: Expression): Seq[String] = { +expr.children.flatMap(getInputRows(ctx, _)).distinct + } + + /** + * Given a child expression, retrieves previous input rows referred by it or deferred expressions + * which are needed to evaluate it. + */ + def getInputRows(ctx: CodegenContext, child: Expression): Seq[String] = { +child.flatMap { + // An expression directly evaluates on current input row. + case BoundReference(ordinal, _, _) if ctx.currentVars == null || + ctx.currentVars(ordinal) == null => +Seq(ctx.INPUT_ROW) + + // An expression which is
[GitHub] spark pull request #19841: [SPARK-22642][SQL] the createdTempDir will not be...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19841#discussion_r154575693 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala --- @@ -104,14 +105,61 @@ case class InsertIntoHiveTable( val partitionColumns = fileSinkConf.getTableInfo.getProperties.getProperty("partition_columns") val partitionColumnNames = Option(partitionColumns).map(_.split("/")).getOrElse(Array.empty) -// By this time, the partition map must match the table's partition columns -if (partitionColumnNames.toSet != partition.keySet) { - throw new SparkException( -s"""Requested partitioning does not match the ${table.identifier.table} table: - |Requested partitions: ${partition.keys.mkString(",")} - |Table partitions: ${table.partitionColumnNames.mkString(",")}""".stripMargin) +try { + // By this time, the partition map must match the table's partition columns + if (partitionColumnNames.toSet != partition.keySet) { +throw new SparkException( + s"""Requested partitioning does not match the ${table.identifier.table} table: + |Requested partitions: ${partition.keys.mkString(",")} + |Table partitions: ${table.partitionColumnNames.mkString(",")}""".stripMargin) + } + + validatePartitionSpec(hadoopConf, numDynamicPartitions, numStaticPartitions, +partitionSpec, partitionColumnNames) + + validateBucketSpec(hadoopConf) + + val partitionAttributes = partitionColumnNames.takeRight(numDynamicPartitions).map { name => +query.resolve(name :: Nil, sparkSession.sessionState.analyzer.resolver).getOrElse { + throw new AnalysisException( +s"Unable to resolve $name given [${query.output.map(_.name).mkString(", ")}]") +}.asInstanceOf[Attribute] + } + --- End diff -- ```Scala try { ``` should start from this line, right? No need to create `validatePartitionSpec ` and `validateBucketSpec ` in this PR. We want to minimized the code changes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19874: [SPARK-22675] [SQL] Refactoring PropagateTypes in TypeCo...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19874 **[Test build #84418 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84418/testReport)** for PR 19874 at commit [`657cf88`](https://github.com/apache/spark/commit/657cf88bbf4259d1a823f93f16eaccc2fbe78667). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19813#discussion_r154575588 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala --- @@ -108,7 +108,10 @@ private[sql] trait ColumnarBatchScan extends CodegenSupport { |}""".stripMargin) ctx.currentVars = null +// `rowIdx` isn't in `ctx.currentVars`. If the expressions are split later, we can't track it. +// So making it as global variable. --- End diff -- can we add `rowId` to `ExprCode.inputVars` in `genCodeColumnVector`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17176: [SPARK-19833][SQL]remove SQLConf.HIVE_VERIFY_PART...
Github user barrenlake commented on a diff in the pull request: https://github.com/apache/spark/pull/17176#discussion_r154575331 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala --- @@ -159,36 +159,11 @@ class HadoopTableReader( def verifyPartitionPath( partitionToDeserializer: Map[HivePartition, Class[_ <: Deserializer]]): Map[HivePartition, Class[_ <: Deserializer]] = { - if (!sparkSession.sessionState.conf.verifyPartitionPath) { -partitionToDeserializer - } else { -var existPathSet = collection.mutable.Set[String]() -var pathPatternSet = collection.mutable.Set[String]() -partitionToDeserializer.filter { - case (partition, partDeserializer) => -def updateExistPathSetByPathPattern(pathPatternStr: String) { - val pathPattern = new Path(pathPatternStr) - val fs = pathPattern.getFileSystem(hadoopConf) - val matches = fs.globStatus(pathPattern) - matches.foreach(fileStatus => existPathSet += fileStatus.getPath.toString) -} -// convert /demo/data/year/month/day to /demo/data/*/*/*/ -def getPathPatternByPath(parNum: Int, tempPath: Path): String = { - var path = tempPath - for (i <- (1 to parNum)) path = path.getParent - val tails = (1 to parNum).map(_ => "*").mkString("/", "/", "/") - path.toString + tails -} - -val partPath = partition.getDataLocation -val partNum = Utilities.getPartitionDesc(partition).getPartSpec.size(); -var pathPatternStr = getPathPatternByPath(partNum, partPath) -if (!pathPatternSet.contains(pathPatternStr)) { - pathPatternSet += pathPatternStr - updateExistPathSetByPathPattern(pathPatternStr) -} -existPathSet.contains(partPath.toString) -} + partitionToDeserializer.filter { +case (partition, partDeserializer) => + val partPath = partition.getDataLocation + val fs = partPath.getFileSystem(hadoopConf) + fs.exists(partPath) --- End diff -- Each partition sending an RPC request to the NameNode can result in poor performance --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19874: [SPARK-22675] [SQL] Refactoring PropagateTypes in...
GitHub user gatorsmile opened a pull request: https://github.com/apache/spark/pull/19874 [SPARK-22675] [SQL] Refactoring PropagateTypes in TypeCoercion ## What changes were proposed in this pull request? PropagateTypes are called twice in TypeCoercion. We do not need to call it twice. Instead, we should call it after each change on the types. ## How was this patch tested? The existing tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/gatorsmile/spark deduplicatePropagateTypes Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19874.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19874 commit fc3a24e039c905fdd263d2ece2088ce152887fc7 Author: gatorsmileDate: 2017-12-03T05:55:00Z clean commit 657cf88bbf4259d1a823f93f16eaccc2fbe78667 Author: gatorsmile Date: 2017-12-04T07:40:26Z refactor --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19869: [SPARK-22677][SQL] cleanup whole stage codegen for hash ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19869 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19869: [SPARK-22677][SQL] cleanup whole stage codegen for hash ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19869 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84416/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19869: [SPARK-22677][SQL] cleanup whole stage codegen for hash ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19869 **[Test build #84416 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84416/testReport)** for PR 19869 at commit [`479fd8d`](https://github.com/apache/spark/commit/479fd8d4cb6ba6f0126d996362125746dfeac8f2). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/19813#discussion_r154574087 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/ExpressionCodegen.scala --- @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.codegen + +import scala.collection.mutable + +import org.apache.spark.sql.catalyst.expressions._ + +/** + * Defines APIs used in expression code generation. + */ +object ExpressionCodegen { + + /** + * Given an expression, returns the all necessary parameters to evaluate it, so the generated + * code of this expression can be split in a function. + * The 1st string in returned tuple is the parameter strings used to call the function. + * The 2nd string in returned tuple is the parameter strings used to declare the function. + * + * Returns `None` if it can't produce valid parameters. + * + * Params to include: + * 1. Evaluated columns referred by this, children or deferred expressions. + * 2. Rows referred by this, children or deferred expressions. + * 3. Eliminated subexpressions referred bu children expressions. + */ + def getExpressionInputParams( + ctx: CodegenContext, + expr: Expression): Option[(Seq[String], Seq[String])] = { +val (inputAttrs, inputVars) = getInputVarsForChildren(ctx, expr) +val paramsFromColumns = prepareFunctionParams(ctx, inputAttrs, inputVars) + +val subExprs = getSubExprInChildren(ctx, expr) +val subExprCodes = getSubExprCodes(ctx, subExprs) +val paramsFromSubExprs = prepareFunctionParams(ctx, subExprs, subExprCodes) + +val inputRows = ctx.INPUT_ROW +: getInputRowsForChildren(ctx, expr) +val paramsFromRows = inputRows.distinct.filter(_ != null).map { row => + (row, s"InternalRow $row") +} + +val paramsLength = getParamLength(ctx, inputAttrs, subExprs) + paramsFromRows.length +// Maximum allowed parameter number for Java's method descriptor. +if (paramsLength > 255) { + None +} else { + val allParams = (paramsFromRows ++ paramsFromColumns ++ paramsFromSubExprs).unzip + val callParams = allParams._1.distinct + val declParams = allParams._2.distinct + Some((callParams, declParams)) +} + } + + /** + * Returns the eliminated subexpressions in the children expressions. + */ + def getSubExprInChildren(ctx: CodegenContext, expr: Expression): Seq[Expression] = { +expr.children.flatMap { child => + child.collect { +case e if ctx.subExprEliminationExprs.contains(e) => e + } +}.distinct + } + + /** + * A small helper function to return `ExprCode`s that represent subexpressions. + */ + def getSubExprCodes(ctx: CodegenContext, subExprs: Seq[Expression]): Seq[ExprCode] = { +subExprs.map { subExpr => + val stat = ctx.subExprEliminationExprs(subExpr) --- End diff -- nit: what does `stat` mean? In general, it is related to `statistics`. Is it better to use another variable name? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19813#discussion_r154573755 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -1001,16 +1017,25 @@ class CodegenContext { commonExprs.foreach { e => val expr = e.head val fnName = freshName("evalExpr") - val isNull = s"${fnName}IsNull" + val isNull = if (expr.nullable) { +s"${fnName}IsNull" + } else { +"" + } val value = s"${fnName}Value" // Generate the code for this expression tree and wrap it in a function. val eval = expr.genCode(this) + val nullValue = if (expr.nullable) { +s"$isNull = ${eval.isNull};" --- End diff -- nit: ``` val setIsNull = if (expr.nullable) { s"${fnName}IsNull = ${eval.isNull};" } else "" ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154572754 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStep.scala --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.steps + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.ServiceBuilder + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec +import org.apache.spark.internal.Logging +import org.apache.spark.util.Clock + +/** + * Allows the driver to be reachable by executor pods through a headless service. The service's + * ports should correspond to the ports that the executor will reach the pod at for RPC. + */ +private[spark] class DriverServiceBootstrapStep( +kubernetesResourceNamePrefix: String, +driverLabels: Map[String, String], +submissionSparkConf: SparkConf, +clock: Clock) extends DriverConfigurationStep with Logging { + import DriverServiceBootstrapStep._ + + override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { +require(submissionSparkConf.getOption(DRIVER_BIND_ADDRESS_KEY).isEmpty, + s"$DRIVER_BIND_ADDRESS_KEY is not supported in Kubernetes mode, as the driver's bind " + + "address is managed and set to the driver pod's IP address.") +require(submissionSparkConf.getOption(DRIVER_HOST_KEY).isEmpty, + s"$DRIVER_HOST_KEY is not supported in Kubernetes mode, as the driver's hostname will be " + + "managed via a Kubernetes service.") + +val preferredServiceName = s"$kubernetesResourceNamePrefix$DRIVER_SVC_POSTFIX" +val resolvedServiceName = if (preferredServiceName.length <= MAX_SERVICE_NAME_LENGTH) { + preferredServiceName +} else { + val randomServiceId = clock.getTimeMillis() + val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX" + logWarning(s"Driver's hostname would preferably be $preferredServiceName, but this is " + +s"too long (must be <= $MAX_SERVICE_NAME_LENGTH characters). Falling back to use " + +s"$shorterServiceName as the driver service's name.") + shorterServiceName +} + +val driverPort = submissionSparkConf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT) +val driverBlockManagerPort = submissionSparkConf.getInt( +org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT.key, DEFAULT_BLOCKMANAGER_PORT) +val driverService = new ServiceBuilder() + .withNewMetadata() +.withName(resolvedServiceName) +.endMetadata() + .withNewSpec() +.withClusterIP("None") +.withSelector(driverLabels.asJava) +.addNewPort() + .withName(DRIVER_PORT_NAME) + .withPort(driverPort) + .withNewTargetPort(driverPort) + .endPort() +.addNewPort() + .withName(BLOCK_MANAGER_PORT_NAME) + .withPort(driverBlockManagerPort) + .withNewTargetPort(driverBlockManagerPort) + .endPort() +.endSpec() + .build() + +val namespace = submissionSparkConf.get(KUBERNETES_NAMESPACE) +val driverHostname = s"${driverService.getMetadata.getName}.$namespace.svc.cluster.local" +val resolvedSparkConf = driverSpec.driverSparkConf.clone() + .set(org.apache.spark.internal.config.DRIVER_HOST_ADDRESS, driverHostname) --- End diff -- I meant isn't `org.apache.spark.internal.config.DRIVER_HOST_ADDRESS` just `DRIVER_HOST_KEY`? --- - To unsubscribe, e-mail:
[GitHub] spark issue #19873: [SPARK-20392][SQL] Set barrier to prevent re-entering a ...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/19873 cc @cloud-fan @hvanhovell Basically this is the same changes in #17770. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19873: [SPARK-20392][SQL] Set barrier to prevent re-entering a ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19873 **[Test build #84417 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84417/testReport)** for PR 19873 at commit [`136fd30`](https://github.com/apache/spark/commit/136fd30cb98609e648a8b689a28853c1bae67bf7). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19873: [SPARK-20392][SQL] Set barrier to prevent re-ente...
GitHub user viirya opened a pull request: https://github.com/apache/spark/pull/19873 [SPARK-20392][SQL] Set barrier to prevent re-entering a tree ## What changes were proposed in this pull request? It is reported that there is performance downgrade when applying ML pipeline for dataset with many columns but few rows. A big part of the performance downgrade comes from some operations (e.g., `select`) on DataFrame/Dataset which re-create new DataFrame/Dataset with a new `LogicalPlan`. The cost can be ignored in the usage of SQL, normally. However, it's not rare to chain dozens of pipeline stages in ML. When the query plan grows incrementally during running those stages, the total cost spent on re-creation of DataFrame grows too. In particular, the `Analyzer` will go through the big query plan even most part of it is analyzed. By eliminating part of the cost, the time to run the example code locally is reduced from about 1min to about 30 secs. In particular, the time applying the pipeline locally is mostly spent on calling transform of the 137 `Bucketizer`s. Before the change, each call of `Bucketizer`'s transform can cost about 0.4 sec. So the total time spent on all `Bucketizer`s' transform is about 50 secs. After the change, each call only costs about 0.1 sec. ## How was this patch tested? Existing tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/viirya/spark-1 SPARK-20392-reopen Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19873.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19873 commit 136fd30cb98609e648a8b689a28853c1bae67bf7 Author: Liang-Chi HsiehDate: 2017-12-04T07:19:35Z Add analysis barrier around analyzed plans. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19813#discussion_r154571348 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -115,9 +120,35 @@ abstract class Expression extends TreeNode[Expression] { } } + /** + * Returns the input variables to this expression. + */ + private def findInputVars(ctx: CodegenContext, eval: ExprCode): Seq[ExprInputVar] = { +if (ctx.currentVars != null) { + val boundRefs = this.collect { +case b @ BoundReference(ordinal, _, _) if ctx.currentVars(ordinal) != null => (ordinal, b) --- End diff -- shall we add an assert here to guarantee that, `ctx.currentVars(ordinal).code` is empty? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19841: [SPARK-22642][SQL] the createdTempDir will not be delete...
Github user zuotingbing commented on the issue: https://github.com/apache/spark/pull/19841 I extract a separate function but it has too many parameters. Could i extract several separate functions? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19869: [SPARK-22677][SQL] cleanup whole stage codegen for hash ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19869 **[Test build #84416 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84416/testReport)** for PR 19869 at commit [`479fd8d`](https://github.com/apache/spark/commit/479fd8d4cb6ba6f0126d996362125746dfeac8f2). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19869: [SPARK-22677][SQL] cleanup whole stage codegen for hash ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19869 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84410/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19869: [SPARK-22677][SQL] cleanup whole stage codegen for hash ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19869 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19869: [SPARK-22677][SQL] cleanup whole stage codegen for hash ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19869 **[Test build #84410 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84410/testReport)** for PR 19869 at commit [`9b8ae3d`](https://github.com/apache/spark/commit/9b8ae3d6635c5ed0323bf088e20d0de55dd1c098). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19872: WIP: [SPARK-22274][PySpark] User-defined aggregat...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19872#discussion_r154569899 --- Diff: python/pyspark/sql/group.py --- @@ -89,8 +89,15 @@ def agg(self, *exprs): else: # Columns assert all(isinstance(c, Column) for c in exprs), "all exprs should be Column" -jdf = self._jgd.agg(exprs[0]._jc, -_to_seq(self.sql_ctx._sc, [c._jc for c in exprs[1:]])) +if isinstance(exprs[0], UDFColumn): +assert all(isinstance(c, UDFColumn) for c in exprs) --- End diff -- A informative error message should be better. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19872: WIP: [SPARK-22274][PySpark] User-defined aggregat...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19872#discussion_r154569953 --- Diff: python/pyspark/sql/group.py --- @@ -89,8 +89,15 @@ def agg(self, *exprs): else: # Columns assert all(isinstance(c, Column) for c in exprs), "all exprs should be Column" -jdf = self._jgd.agg(exprs[0]._jc, -_to_seq(self.sql_ctx._sc, [c._jc for c in exprs[1:]])) +if isinstance(exprs[0], UDFColumn): +assert all(isinstance(c, UDFColumn) for c in exprs) --- End diff -- Like `all exprs should be UDFColumn"`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19872: WIP: [SPARK-22274][PySpark] User-defined aggregat...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19872#discussion_r154569884 --- Diff: python/pyspark/sql/group.py --- @@ -89,8 +89,15 @@ def agg(self, *exprs): else: # Columns assert all(isinstance(c, Column) for c in exprs), "all exprs should be Column" -jdf = self._jgd.agg(exprs[0]._jc, -_to_seq(self.sql_ctx._sc, [c._jc for c in exprs[1:]])) +if isinstance(exprs[0], UDFColumn): +assert all(isinstance(c, UDFColumn) for c in exprs) +jdf = self._jgd.aggInPandas( +_to_seq(self.sql_ctx._sc, [c._jc for c in exprs])) +else: +jdf = self._jgd.agg(exprs[0]._jc, --- End diff -- If `exprs[n]` (n > 0) is a `UDFColumn`? I think we should make sure if any column is a `UDFColumn`, all columns should be `UDFColumn`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/19869#discussion_r154569858 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala --- @@ -768,12 +762,8 @@ case class HashAggregateExec( // generate hash code for key val hashExpr = Murmur3Hash(groupingExpressions, 42) -ctx.currentVars = input val hashEval = BindReferences.bindReference(hashExpr, child.output).genCode(ctx) -val inputAttr = aggregateBufferAttributes ++ child.output -ctx.currentVars = new Array[ExprCode](aggregateBufferAttributes.length) ++ input - val (checkFallbackForGeneratedHashMap, checkFallbackForBytesToBytesMap, resetCounter, incCounter) = if (testFallbackStartsAt.isDefined) { --- End diff -- ok, thanks --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19869#discussion_r154569741 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala --- @@ -768,12 +762,8 @@ case class HashAggregateExec( // generate hash code for key val hashExpr = Murmur3Hash(groupingExpressions, 42) -ctx.currentVars = input val hashEval = BindReferences.bindReference(hashExpr, child.output).genCode(ctx) -val inputAttr = aggregateBufferAttributes ++ child.output -ctx.currentVars = new Array[ExprCode](aggregateBufferAttributes.length) ++ input - val (checkFallbackForGeneratedHashMap, checkFallbackForBytesToBytesMap, resetCounter, incCounter) = if (testFallbackStartsAt.isDefined) { --- End diff -- I'll have another PR for this part, will reformat it at that time --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19872: WIP: [SPARK-22274][PySpark] User-defined aggregation fun...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19872 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84415/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19872: WIP: [SPARK-22274][PySpark] User-defined aggregation fun...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19872 **[Test build #84415 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84415/testReport)** for PR 19872 at commit [`a1058b8`](https://github.com/apache/spark/commit/a1058b8f91bc1093ef231bf41d6553d045788abc). * This patch **fails Python style tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19872: WIP: [SPARK-22274][PySpark] User-defined aggregation fun...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19872 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19872: WIP: [SPARK-22274][PySpark] User-defined aggregation fun...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19872 **[Test build #84415 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84415/testReport)** for PR 19872 at commit [`a1058b8`](https://github.com/apache/spark/commit/a1058b8f91bc1093ef231bf41d6553d045788abc). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19840: [SPARK-22640][PYSPARK][YARN]switch python exec on execut...
Github user yaooqinn commented on the issue: https://github.com/apache/spark/pull/19840 @ueshin case 8 should be client deploy mode, excuse me for copy mistake, fixed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19872: WIP: [SPARK-22274][PySpark] User-defined aggregat...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19872#discussion_r154569177 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala --- @@ -113,6 +113,7 @@ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper { def apply(plan: SparkPlan): SparkPlan = plan transformUp { // FlatMapGroupsInPandas can be evaluated directly in python worker // Therefore we don't need to extract the UDFs --- End diff -- `FlatMapGroupsInPandas` and `AggregateInPandasExec` can be... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154569020 --- Diff: resource-managers/kubernetes/docker/src/main/dockerfiles/spark-base/entrypoint.sh --- @@ -0,0 +1,37 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# echo commands to the terminal output +set -x --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154568773 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala --- @@ -590,6 +604,11 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S | the node running the Application Master via the Secure | Distributed Cache, for renewing the login tickets and the | delegation tokens periodically. +| +| Kubernetes only: +| --kubernetes-namespace NS The namespace in the Kubernetes cluster within which the +| application must be launched. The namespace must already +| exist in the cluster. (Default: default). --- End diff -- Actually k8s does not yet support `kill` and `status`, nor does it support `spark.cores.max` yet. Updated `validateKillArguments` and `validateStatusRequestArguments`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19840: [SPARK-22640][PYSPARK][YARN]switch python exec on execut...
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/19840 @yaooqinn What's the difference between case 7 and 8? Looks like the same configuration but the different result? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19862#discussion_r154568554 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java --- @@ -159,6 +154,12 @@ public boolean hasNext() { @Override public UnsafeRow next() { try { +if (!alreadyCalculated) { --- End diff -- When `hasNext` is called, doesn't `sortedIterator` return no element anymore since we haven't added rows into it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19862#discussion_r154568281 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java --- @@ -159,6 +154,12 @@ public boolean hasNext() { @Override public UnsafeRow next() { try { +if (!alreadyCalculated) { + while (inputIterator.hasNext()) { +insertRow(inputIterator.next()); + } + alreadyCalculated = true; +} sortedIterator.loadNext(); --- End diff -- `sortedIterator` is already assigned at L143. When you insert rows when first time to call `next`, can the `sortedIterator` correctly return sorted elements? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19872: WIP: [SPARK-22274][PySpark] User-defined aggregation fun...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19872 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19872: WIP: [SPARK-22274][PySpark] User-defined aggregation fun...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19872 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84414/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19872: WIP: [SPARK-22274][PySpark] User-defined aggregation fun...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19872 **[Test build #84414 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84414/testReport)** for PR 19872 at commit [`4cfaf0e`](https://github.com/apache/spark/commit/4cfaf0e9723bcfbb74dfd1b9d1f5e30682bf072f). * This patch **fails Scala style tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19872: WIP: [SPARK-22274][PySpark] User-defined aggregation fun...
Github user icexelloss commented on the issue: https://github.com/apache/spark/pull/19872 cc @HyukjinKwon @holdenk @ueshin Passing some basic tests. I will work on this more next week to clean up and add more testing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19872: WIP: [SPARK-22274][PySpark] User-defined aggregation fun...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19872 **[Test build #84414 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84414/testReport)** for PR 19872 at commit [`4cfaf0e`](https://github.com/apache/spark/commit/4cfaf0e9723bcfbb74dfd1b9d1f5e30682bf072f). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154567885 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala --- @@ -590,6 +600,11 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S | the node running the Application Master via the Secure | Distributed Cache, for renewing the login tickets and the | delegation tokens periodically. +| +| Kubernetes only: +| --kubernetes-namespace NS The namespace in the Kubernetes cluster within which the +| application must be launched. The namespace must already +| exist in the cluster. (Default: default). --- End diff -- I think if we eventually decide to not have default docker images, we should make the options `--param` ones. I'm not sure if we want to make a call and do that in this PR though. Can we defer this to a later time when we are clearer on how we publish and maintain the images? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18995: [SPARK-21787][SPARK-22672][SQL] Support for pushing down...
Github user dongjoon-hyun commented on the issue: https://github.com/apache/spark/pull/18995 BTW, @cloud-fan . Do you mean literally `move`? I'm wondering if I'm thinking in a different way. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19872: WIP: [SPARK-22274][PySpark] User-defined aggregat...
GitHub user icexelloss opened a pull request: https://github.com/apache/spark/pull/19872 WIP: [SPARK-22274][PySpark] User-defined aggregation functions with pandas udf ## What changes were proposed in this pull request? Add support for pandas_udf in groupby().agg() ## How was this patch tested? GroupbyAggTests You can merge this pull request into a Git repository by running: $ git pull https://github.com/icexelloss/spark SPARK-22274-groupby-agg Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19872.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19872 commit f71575782be3f9c41184eeafa275b5ba1cb5fb83 Author: Li JinDate: 2017-12-01T17:26:26Z Initial commit: wip commit 2e03eec8de2ed6d38e807428c18f2500a8717b32 Author: Li Jin Date: 2017-12-01T22:54:02Z Test working. Need clean up commit 456c4a8adf646ee46b00f8ce51d4e9e8279abc3e Author: Li Jin Date: 2017-12-04T06:34:16Z Add tests commit 35ff548ac942d210ccd99fb2a60b95e2d4a28e2a Author: Li Jin Date: 2017-12-04T06:36:03Z Clean up --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19862#discussion_r154567693 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -750,6 +756,8 @@ private[joins] class SortMergeJoinScanner( bufferedMatches.clear() false } else { + // To make sure vars like bufferedRow is set + advancedBufferedIterRes --- End diff -- Add a comment like https://github.com/apache/spark/pull/19862/files#r154567168. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19871: [SPARK-20728][SQL] Make OrcFileFormat configurable betwe...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19871 **[Test build #84413 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84413/testReport)** for PR 19871 at commit [`2e498f9`](https://github.com/apache/spark/commit/2e498f9c1a748bdb648705c16305f9f34e638ff8). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19862#discussion_r154567585 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java --- @@ -182,18 +183,14 @@ public UnsafeRow next() { } }; } catch (IOException e) { - cleanupResources(); throw e; +} finally { + // Since we won't ever call next() on an empty iterator, we need to clean up resources + // here in order to prevent memory leaks. + cleanupResources(); --- End diff -- This makes the resource cleaned up when we return iterator too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19862#discussion_r154567319 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -674,8 +674,9 @@ private[joins] class SortMergeJoinScanner( private[this] val bufferedMatches = new ExternalAppendOnlyUnsafeRowArray(inMemoryThreshold, spillThreshold) - // Initialization (note: do _not_ want to advance streamed here). - advancedBufferedToRowWithNullFreeJoinKey() + // Initialization (note: do _not_ want to advance streamed here). This is made lazy to prevent + // unnecessary trigger of calculation. + private lazy val advancedBufferedIterRes = advancedBufferedToRowWithNullFreeJoinKey() --- End diff -- `This is made lazy to run the initialization only once when accessing it.` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19871: [SPARK-20728][SQL] Make OrcFileFormat configurabl...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19871#discussion_r154567195 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala --- @@ -54,11 +55,17 @@ class DDLSourceLoadSuite extends DataSourceTest with SharedSQLContext { .load().schema == StructType(Seq(StructField("stringType", StringType, nullable = false } - test("should fail to load ORC without Hive Support") { -val e = intercept[AnalysisException] { - spark.read.format("orc").load() + test("should fail to load ORC only if spark.sql.orc.enabled=false and without Hive Support") { --- End diff -- Oh, I confused with [SQLQuerySuite.scala](https://github.com/apache/spark/pull/19871/files#diff-1ea02a6fab84e938582f7f87cc4d9ea1R2157) in hive. Sorry, I'll remove this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19862#discussion_r154567168 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -699,39 +700,44 @@ private[joins] class SortMergeJoinScanner( matchJoinKey = null bufferedMatches.clear() false -} else if (matchJoinKey != null && keyOrdering.compare(streamedRowKey, matchJoinKey) == 0) { - // The new streamed row has the same join key as the previous row, so return the same matches. - true -} else if (bufferedRow == null) { - // The streamed row's join key does not match the current batch of buffered rows and there are - // no more rows to read from the buffered iterator, so there can be no more matches. - matchJoinKey = null - bufferedMatches.clear() - false } else { - // Advance both the streamed and buffered iterators to find the next pair of matching rows. - var comp = keyOrdering.compare(streamedRowKey, bufferedRowKey) - do { -if (streamedRowKey.anyNull) { - advancedStreamed() -} else { - assert(!bufferedRowKey.anyNull) - comp = keyOrdering.compare(streamedRowKey, bufferedRowKey) - if (comp > 0) advancedBufferedToRowWithNullFreeJoinKey() - else if (comp < 0) advancedStreamed() -} - } while (streamedRow != null && bufferedRow != null && comp != 0) - if (streamedRow == null || bufferedRow == null) { -// We have either hit the end of one of the iterators, so there can be no more matches. + // To make sure vars like bufferedRow is set + advancedBufferedIterRes --- End diff -- Then add a comment like `Initialization at the first time reaching here`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154567054 --- Diff: resource-managers/kubernetes/docker/src/main/dockerfiles/driver/Dockerfile --- @@ -0,0 +1,31 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +FROM spark-base + +# If this docker file is being used in the context of building your images from a Spark distribution, the docker build +# command should be invoked from the top level directory of the Spark distribution. E.g.: +# docker build -t spark-driver:latest -f dockerfiles/driver/Dockerfile . + +COPY examples /opt/spark/examples + +CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \ +env | grep SPARK_JAVA_OPT_ | sed 's/[^=]*=\(.*\)/\1/g' > /tmp/java_opts.txt && \ +readarray -t SPARK_DRIVER_JAVA_OPTS < /tmp/java_opts.txt && \ +if ! [ -z ${SPARK_MOUNTED_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \ +if ! [ -z ${SPARK_SUBMIT_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_SUBMIT_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \ +${JAVA_HOME}/bin/java "${SPARK_DRIVER_JAVA_OPTS[@]}" -cp "$SPARK_CLASSPATH" -Xms$SPARK_DRIVER_MEMORY -Xmx$SPARK_DRIVER_MEMORY -Dspark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS $SPARK_DRIVER_CLASS $SPARK_DRIVER_ARGS --- End diff -- I think this is debatable. They have some env variables with similar purposes, but they also have role-specific arguments/properties that probably make sense to warrant separate images. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18995: [SPARK-21787][SPARK-22672][SQL] Support for pushing down...
Github user dongjoon-hyun commented on the issue: https://github.com/apache/spark/pull/18995 @cloud-fan . For the other part, I'm restructuring to remove redundancy in the same manner of `InMemoryCatalogedDDLSuite`, `HiveCatalogedDDLSuite`, and `DDLSuite`. This one cannot be the part of that restructuring, because the test function signatures are different due to `PredicateLeaf` and `SearchArgument`. Could you review this first? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19871: [SPARK-20728][SQL] Make OrcFileFormat configurable betwe...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19871 **[Test build #84412 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84412/testReport)** for PR 19871 at commit [`8b7e88a`](https://github.com/apache/spark/commit/8b7e88a833adf1d3138ce426142b6bb6abe057df). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19869: [SPARK-22677][SQL] cleanup whole stage codegen for hash ...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/19869 LGTM for super minor comments... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19862#discussion_r154566562 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -699,39 +700,44 @@ private[joins] class SortMergeJoinScanner( matchJoinKey = null bufferedMatches.clear() false -} else if (matchJoinKey != null && keyOrdering.compare(streamedRowKey, matchJoinKey) == 0) { - // The new streamed row has the same join key as the previous row, so return the same matches. - true -} else if (bufferedRow == null) { - // The streamed row's join key does not match the current batch of buffered rows and there are - // no more rows to read from the buffered iterator, so there can be no more matches. - matchJoinKey = null - bufferedMatches.clear() - false } else { - // Advance both the streamed and buffered iterators to find the next pair of matching rows. - var comp = keyOrdering.compare(streamedRowKey, bufferedRowKey) - do { -if (streamedRowKey.anyNull) { - advancedStreamed() -} else { - assert(!bufferedRowKey.anyNull) - comp = keyOrdering.compare(streamedRowKey, bufferedRowKey) - if (comp > 0) advancedBufferedToRowWithNullFreeJoinKey() - else if (comp < 0) advancedStreamed() -} - } while (streamedRow != null && bufferedRow != null && comp != 0) - if (streamedRow == null || bufferedRow == null) { -// We have either hit the end of one of the iterators, so there can be no more matches. + // To make sure vars like bufferedRow is set + advancedBufferedIterRes --- End diff -- Oh. I see. `advancedBufferedIterRes` is a lazy val. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154566501 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala --- @@ -119,5 +139,60 @@ private[spark] object Config extends Logging { "must be a positive integer") .createWithDefault(10) + val WAIT_FOR_APP_COMPLETION = +ConfigBuilder("spark.kubernetes.submission.waitAppCompletion") + .doc("In cluster mode, whether to wait for the application to finish before exiting the " + +"launcher process.") + .booleanConf + .createWithDefault(true) + + val REPORT_INTERVAL = +ConfigBuilder("spark.kubernetes.report.interval") + .doc("Interval between reports of the current app status in cluster mode.") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("1s") + + private[spark] val JARS_DOWNLOAD_LOCATION = +ConfigBuilder("spark.kubernetes.mountDependencies.jarsDownloadDir") + .doc("Location to download jars to in the driver and executors. When using" + +" spark-submit, this directory must be empty and will be mounted as an empty directory" + +" volume on the driver and executor pod.") + .stringConf + .createWithDefault("/var/spark-data/spark-jars") + + private[spark] val FILES_DOWNLOAD_LOCATION = +ConfigBuilder("spark.kubernetes.mountDependencies.filesDownloadDir") + .doc("Location to download files to in the driver and executors. When using" + +" spark-submit, this directory must be empty and will be mounted as an empty directory" + +" volume on the driver and executor pods.") + .stringConf + .createWithDefault("/var/spark-data/spark-files") --- End diff -- See comment above. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/19869#discussion_r154566445 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala --- @@ -882,45 +851,65 @@ case class HashAggregateExec( |${evaluateVariables(unsafeRowBufferEvals)} |// update unsafe row buffer |${updateUnsafeRowBuffer.mkString("\n").trim} - """.stripMargin + """.stripMargin } +val updateRowInHashMap: String = { + if (isFastHashMapEnabled) { +ctx.INPUT_ROW = fastRowBuffer +val boundUpdateExpr = updateExpr.map(BindReferences.bindReference(_, inputAttr)) +val subExprs = ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr) +val effectiveCodes = subExprs.codes.mkString("\n") +val fastRowEvals = ctx.withSubExprEliminationExprs(subExprs.states) { + boundUpdateExpr.map(_.genCode(ctx)) +} +val updateFastRow = fastRowEvals.zipWithIndex.map { case (ev, i) => + val dt = updateExpr(i).dataType + ctx.updateColumn( +fastRowBuffer, dt, i, ev, updateExpr(i).nullable, isVectorizedHashMapEnabled) +} + +// If fast hash map is on, we first generate code to update row in fast hash map, if the +// previous loop up hit fast hash map. Otherwise, update row in regular hash map. +s""" + |if ($fastRowBuffer != null) { + | // common sub-expressions + | $effectiveCodes + | // evaluate aggregate function + | ${evaluateVariables(fastRowEvals)} + | // update fast row + | ${updateFastRow.mkString("\n").trim} + |} else { + | $updateRowInRegularHashMap + |} + """.stripMargin + } else { +updateRowInRegularHashMap + } +} + +val declareFastRowBuffer: String = if (isFastHashMapEnabled) { + val rowType = if (isVectorizedHashMapEnabled) { +classOf[MutableColumnarRow].getName + } else { +"UnsafeRow" + } + s"$rowType $fastRowBuffer = null;" +} else "" // We try to do hash map based in-memory aggregation first. If there is not enough memory (the // hash map will return null for new key), we spill the hash map to disk to free memory, then // continue to do in-memory aggregation and spilling until all the rows had been processed. // Finally, sort the spilled aggregate buffers by key, and merge them together for same key. s""" UnsafeRow $unsafeRowBuffer = null; --- End diff -- nit: How about this for just consistency? ``` s""" $declareRowBuffer $findOrInsertHashMap $incCounter $updateRowInHashMap """ ``` Then, ``` val declareRowBuffer: String = if (isFastHashMapEnabled) { val rowType = if (isVectorizedHashMapEnabled) { classOf[MutableColumnarRow].getName } else { "UnsafeRow" } s""" |UnsafeRow $unsafeRowBuffer = null; |$rowType $fastRowBuffer = null; """.stripMargin } else { s"UnsafeRow $unsafeRowBuffer = null;" } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/19869#discussion_r154565659 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala --- @@ -768,12 +762,8 @@ case class HashAggregateExec( // generate hash code for key val hashExpr = Murmur3Hash(groupingExpressions, 42) -ctx.currentVars = input val hashEval = BindReferences.bindReference(hashExpr, child.output).genCode(ctx) -val inputAttr = aggregateBufferAttributes ++ child.output -ctx.currentVars = new Array[ExprCode](aggregateBufferAttributes.length) ++ input - val (checkFallbackForGeneratedHashMap, checkFallbackForBytesToBytesMap, resetCounter, incCounter) = if (testFallbackStartsAt.isDefined) { --- End diff -- nit: need some indents in the head? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/19869#discussion_r154561232 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala --- @@ -621,34 +622,30 @@ case class HashAggregateExec( val iterTerm = ctx.freshName("mapIter") ctx.addMutableState(classOf[KVIterator[UnsafeRow, UnsafeRow]].getName, iterTerm) -def generateGenerateCode(): String = { - if (isFastHashMapEnabled) { -if (isVectorizedHashMapEnabled) { - s""" - | ${fastHashMapGenerator.asInstanceOf[VectorizedHashMapGenerator].generate()} - """.stripMargin -} else { - s""" - | ${fastHashMapGenerator.asInstanceOf[RowBasedHashMapGenerator].generate()} - """.stripMargin -} - } else "" +if (isFastHashMapEnabled) { + val generatedMap = if (isVectorizedHashMapEnabled) { + fastHashMapGenerator.asInstanceOf[VectorizedHashMapGenerator].generate() + } else { + fastHashMapGenerator.asInstanceOf[RowBasedHashMapGenerator].generate() + } + ctx.addInnerClass(generatedMap) } -ctx.addInnerClass(generateGenerateCode()) val doAgg = ctx.freshName("doAggregateWithKeys") val peakMemory = metricTerm(ctx, "peakMemory") val spillSize = metricTerm(ctx, "spillSize") val avgHashProbe = metricTerm(ctx, "avgHashProbe") +val finishFashHashMap = if (isFastHashMapEnabled) { --- End diff -- nit: can we merge this branch with the the branch above (line 625) for hashmap stuffs? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19869: [SPARK-22677][SQL] cleanup whole stage codegen fo...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/19869#discussion_r154562413 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala --- @@ -672,48 +668,56 @@ case class HashAggregateExec( def outputFromRowBasedMap: String = { s""" - while ($iterTermForFastHashMap.next()) { - $numOutput.add(1); - UnsafeRow $keyTerm = (UnsafeRow) $iterTermForFastHashMap.getKey(); - UnsafeRow $bufferTerm = (UnsafeRow) $iterTermForFastHashMap.getValue(); - $outputFunc($keyTerm, $bufferTerm); - - if (shouldStop()) return; - } - $fastHashMapTerm.close(); - """ + |while ($iterTermForFastHashMap.next()) { --- End diff -- super nit: can we also drop unnecessary spaces in the head from this file? e.g., ``` s""" | private void $doAgg() throws java.io.IOException { ``` ``` s""" |private void $doAgg() throws java.io.IOException { ``` https://github.com/cloud-fan/spark/blob/9b8ae3d6635c5ed0323bf088e20d0de55dd1c098/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala#L233 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19862#discussion_r154566463 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -700,38 +701,43 @@ private[joins] class SortMergeJoinScanner( bufferedMatches.clear() false } else if (matchJoinKey != null && keyOrdering.compare(streamedRowKey, matchJoinKey) == 0) { - // The new streamed row has the same join key as the previous row, so return the same matches. + // The new streamed row has the same join key as the previous row, so return the same + // matches. --- End diff -- Unnecessary change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19862#discussion_r154566374 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -699,39 +700,44 @@ private[joins] class SortMergeJoinScanner( matchJoinKey = null bufferedMatches.clear() false -} else if (matchJoinKey != null && keyOrdering.compare(streamedRowKey, matchJoinKey) == 0) { - // The new streamed row has the same join key as the previous row, so return the same matches. - true -} else if (bufferedRow == null) { - // The streamed row's join key does not match the current batch of buffered rows and there are - // no more rows to read from the buffered iterator, so there can be no more matches. - matchJoinKey = null - bufferedMatches.clear() - false } else { - // Advance both the streamed and buffered iterators to find the next pair of matching rows. - var comp = keyOrdering.compare(streamedRowKey, bufferedRowKey) - do { -if (streamedRowKey.anyNull) { - advancedStreamed() -} else { - assert(!bufferedRowKey.anyNull) - comp = keyOrdering.compare(streamedRowKey, bufferedRowKey) - if (comp > 0) advancedBufferedToRowWithNullFreeJoinKey() - else if (comp < 0) advancedStreamed() -} - } while (streamedRow != null && bufferedRow != null && comp != 0) - if (streamedRow == null || bufferedRow == null) { -// We have either hit the end of one of the iterators, so there can be no more matches. + // To make sure vars like bufferedRow is set + advancedBufferedIterRes --- End diff -- Once we advance both the streamed and buffered iterators, and call `bufferMatchingRows` at the last turn, it will advance buffered iterator until the `bufferedRow` doesn't match with current `streamedRowKey`. In the next turn, the call of ``advancedBufferedIterRes` here will advance buffered iterator and so the `bufferedRow` will be missed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154566357 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala --- @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit + +import java.util.{Collections, UUID} + +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.util.control.NonFatal + +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.client.KubernetesClient + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkApplication +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.SparkKubernetesClientFactory +import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils + +/** + * Encapsulates arguments to the submission client. + * + * @param mainAppResource the main application resource if any + * @param mainClass the main class of the application to run + * @param driverArgs arguments to the driver + */ +private[spark] case class ClientArguments( + mainAppResource: Option[MainAppResource], + mainClass: String, + driverArgs: Array[String]) + +private[spark] object ClientArguments { + + def fromCommandLineArgs(args: Array[String]): ClientArguments = { +var mainAppResource: Option[MainAppResource] = None +var mainClass: Option[String] = None +val driverArgs = mutable.ArrayBuffer.empty[String] + +args.sliding(2, 2).toList.foreach { + case Array("--primary-java-resource", primaryJavaResource: String) => +mainAppResource = Some(JavaMainAppResource(primaryJavaResource)) + case Array("--main-class", clazz: String) => +mainClass = Some(clazz) + case Array("--arg", arg: String) => +driverArgs += arg + case other => +val invalid = other.mkString(" ") +throw new RuntimeException(s"Unknown arguments: $invalid") +} + +require(mainClass.isDefined, "Main class must be specified via --main-class") + +ClientArguments( + mainAppResource, + mainClass.get, + driverArgs.toArray) + } +} + +/** + * Submits a Spark application to run on Kubernetes by creating the driver pod and starting a + * watcher that monitors and logs the application status. Waits for the application to terminate if + * spark.kubernetes.submission.waitAppCompletion is true. + * + * @param submissionSteps steps that collectively configure the driver + * @param submissionSparkConf the submission client Spark configuration + * @param kubernetesClient the client to talk to the Kubernetes API server + * @param waitForAppCompletion a flag indicating whether the client should wait for the application + * to complete + * @param appName the application name + * @param loggingPodStatusWatcher a watcher that monitors and logs the application status + */ +private[spark] class Client( +submissionSteps: Seq[DriverConfigurationStep], +submissionSparkConf: SparkConf, +kubernetesClient: KubernetesClient, +waitForAppCompletion: Boolean, +appName: String, +loggingPodStatusWatcher: LoggingPodStatusWatcher) extends Logging { + + private val driverJavaOptions = submissionSparkConf.get( +org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS) + + /** +* Run command that initializes a DriverSpec that will be updated after each +* DriverConfigurationStep in the sequence that is passed in. The final KubernetesDriverSpec +* will be used to build the Driver Container, Driver Pod, and Kubernetes
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154566070 --- Diff: resource-managers/kubernetes/docker/src/main/dockerfiles/executor/Dockerfile --- @@ -0,0 +1,31 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +FROM spark-base + +# If this docker file is being used in the context of building your images from a Spark distribution, the docker build +# command should be invoked from the top level directory of the Spark distribution. E.g.: +# docker build -t spark-executor:latest -f dockerfiles/executor/Dockerfile . + +COPY examples /opt/spark/examples + +CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \ +env | grep SPARK_JAVA_OPT_ | sed 's/[^=]*=\(.*\)/\1/g' > /tmp/java_opts.txt && \ +readarray -t SPARK_EXECUTOR_JAVA_OPTS < /tmp/java_opts.txt && \ +if ! [ -z ${SPARK_MOUNTED_CLASSPATH}+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \ +if ! [ -z ${SPARK_EXECUTOR_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXECUTOR_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \ +${JAVA_HOME}/bin/java "${SPARK_EXECUTOR_JAVA_OPTS[@]}" -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp "$SPARK_CLASSPATH" org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP --- End diff -- `SPARK_EXECUTOR_PORT` is no longer set as `spark.executor.port` is no longer used by Spark. Removed `SPARK_EXECUTOR_PORT`. `SPARK_MOUNTED_CLASSPATH` is set in `DependencyResolutionStep`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154566094 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStep.scala --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.steps + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.ServiceBuilder + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec +import org.apache.spark.internal.Logging +import org.apache.spark.util.Clock + +/** + * Allows the driver to be reachable by executor pods through a headless service. The service's + * ports should correspond to the ports that the executor will reach the pod at for RPC. + */ +private[spark] class DriverServiceBootstrapStep( +kubernetesResourceNamePrefix: String, +driverLabels: Map[String, String], +submissionSparkConf: SparkConf, +clock: Clock) extends DriverConfigurationStep with Logging { + import DriverServiceBootstrapStep._ + + override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { +require(submissionSparkConf.getOption(DRIVER_BIND_ADDRESS_KEY).isEmpty, + s"$DRIVER_BIND_ADDRESS_KEY is not supported in Kubernetes mode, as the driver's bind " + + "address is managed and set to the driver pod's IP address.") +require(submissionSparkConf.getOption(DRIVER_HOST_KEY).isEmpty, + s"$DRIVER_HOST_KEY is not supported in Kubernetes mode, as the driver's hostname will be " + + "managed via a Kubernetes service.") + +val preferredServiceName = s"$kubernetesResourceNamePrefix$DRIVER_SVC_POSTFIX" +val resolvedServiceName = if (preferredServiceName.length <= MAX_SERVICE_NAME_LENGTH) { + preferredServiceName +} else { + val randomServiceId = clock.getTimeMillis() + val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX" + logWarning(s"Driver's hostname would preferably be $preferredServiceName, but this is " + +s"too long (must be <= $MAX_SERVICE_NAME_LENGTH characters). Falling back to use " + +s"$shorterServiceName as the driver service's name.") + shorterServiceName +} + +val driverPort = submissionSparkConf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT) +val driverBlockManagerPort = submissionSparkConf.getInt( +org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT.key, DEFAULT_BLOCKMANAGER_PORT) +val driverService = new ServiceBuilder() + .withNewMetadata() +.withName(resolvedServiceName) +.endMetadata() + .withNewSpec() +.withClusterIP("None") +.withSelector(driverLabels.asJava) +.addNewPort() + .withName(DRIVER_PORT_NAME) + .withPort(driverPort) + .withNewTargetPort(driverPort) + .endPort() +.addNewPort() + .withName(BLOCK_MANAGER_PORT_NAME) + .withPort(driverBlockManagerPort) + .withNewTargetPort(driverBlockManagerPort) + .endPort() +.endSpec() + .build() + +val namespace = submissionSparkConf.get(KUBERNETES_NAMESPACE) +val driverHostname = s"${driverService.getMetadata.getName}.$namespace.svc.cluster.local" +val resolvedSparkConf = driverSpec.driverSparkConf.clone() + .set(org.apache.spark.internal.config.DRIVER_HOST_ADDRESS, driverHostname) --- End diff -- What do you meant here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail:
[GitHub] spark pull request #19871: [SPARK-20728][SQL] Make OrcFileFormat configurabl...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19871#discussion_r154565988 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -568,8 +571,12 @@ object DataSource extends Logging { "org.apache.spark.Logging") /** Given a provider name, look up the data source class definition. */ - def lookupDataSource(provider: String): Class[_] = { -val provider1 = backwardCompatibilityMap.getOrElse(provider, provider) + def lookupDataSource(conf: SQLConf, provider: String): Class[_] = { +var provider1 = backwardCompatibilityMap.getOrElse(provider, provider) +if (Seq("orc").contains(provider1.toLowerCase) && conf.getConf(SQLConf.ORC_USE_NEW_VERSION)) { --- End diff -- Oh. Yep. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19871: [SPARK-20728][SQL] Make OrcFileFormat configurable betwe...
Github user dongjoon-hyun commented on the issue: https://github.com/apache/spark/pull/19871 Thank you for review, @cloud-fan and @jiangxb1987 . The PR is updated. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19871: [SPARK-20728][SQL] Make OrcFileFormat configurabl...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19871#discussion_r154565883 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala --- @@ -54,11 +55,17 @@ class DDLSourceLoadSuite extends DataSourceTest with SharedSQLContext { .load().schema == StructType(Seq(StructField("stringType", StringType, nullable = false } - test("should fail to load ORC without Hive Support") { -val e = intercept[AnalysisException] { - spark.read.format("orc").load() + test("should fail to load ORC only if spark.sql.orc.enabled=false and without Hive Support") { --- End diff -- that test also check the exception: https://github.com/apache/spark/pull/19871/files#diff-5a2e7f03d14856c8769fd3ddea8742bdR2790 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19871: [SPARK-20728][SQL] Make OrcFileFormat configurable betwe...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19871 **[Test build #84411 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84411/testReport)** for PR 19871 at commit [`e7beb02`](https://github.com/apache/spark/commit/e7beb02ef8b1f9761c77952fc69d087c7cc92d3f). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19871: [SPARK-20728][SQL] Make OrcFileFormat configurabl...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19871#discussion_r154565755 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -568,8 +571,12 @@ object DataSource extends Logging { "org.apache.spark.Logging") /** Given a provider name, look up the data source class definition. */ - def lookupDataSource(provider: String): Class[_] = { -val provider1 = backwardCompatibilityMap.getOrElse(provider, provider) + def lookupDataSource(conf: SQLConf, provider: String): Class[_] = { +var provider1 = backwardCompatibilityMap.getOrElse(provider, provider) +if (Seq("orc").contains(provider1.toLowerCase) && conf.getConf(SQLConf.ORC_USE_NEW_VERSION)) { --- End diff -- "orc".equalsIgnoreCast(...) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154565597 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -296,6 +298,12 @@ object SparkSubmit extends CommandLineUtils with Logging { case (STANDALONE, CLUSTER) if args.isR => printErrorAndExit("Cluster deploy mode is currently not supported for R " + "applications on standalone clusters.") + case (KUBERNETES, CLIENT) => +printErrorAndExit("Client mode is currently not supported for Kubernetes.") + case (KUBERNETES, _) if args.isPython => +printErrorAndExit("Python applications are currently not supported for Kubernetes.") + case (KUBERNETES, _) if args.isR => +printErrorAndExit("R applications are currently not supported for Kubernetes.") --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19871: [SPARK-20728][SQL] Make OrcFileFormat configurabl...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19871#discussion_r154565348 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -568,8 +570,13 @@ object DataSource extends Logging { "org.apache.spark.Logging") /** Given a provider name, look up the data source class definition. */ - def lookupDataSource(provider: String): Class[_] = { -val provider1 = backwardCompatibilityMap.getOrElse(provider, provider) + def lookupDataSource(sparkSession: SparkSession, provider: String): Class[_] = { +var provider1 = backwardCompatibilityMap.getOrElse(provider, provider) +if (Seq("orc", "org.apache.spark.sql.hive.orc.OrcFileFormat").contains(provider1.toLowerCase) && +sparkSession.conf.get(SQLConf.ORC_ENABLED)) { --- End diff -- It's done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19856: [SPARK-22664] The logs about "Connected to Zookee...
Github user liu-zhaokun closed the pull request at: https://github.com/apache/spark/pull/19856 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19871: [SPARK-20728][SQL] Make OrcFileFormat configurable betwe...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19871 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84409/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19871: [SPARK-20728][SQL] Make OrcFileFormat configurable betwe...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19871 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19871: [SPARK-20728][SQL] Make OrcFileFormat configurable betwe...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19871 **[Test build #84409 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84409/testReport)** for PR 19871 at commit [`37e240c`](https://github.com/apache/spark/commit/37e240cf12ea7463c2e0ea56501b812e12745869). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...
Github user gczsjdy commented on a diff in the pull request: https://github.com/apache/spark/pull/19862#discussion_r154563897 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -674,8 +674,9 @@ private[joins] class SortMergeJoinScanner( private[this] val bufferedMatches = new ExternalAppendOnlyUnsafeRowArray(inMemoryThreshold, spillThreshold) - // Initialization (note: do _not_ want to advance streamed here). - advancedBufferedToRowWithNullFreeJoinKey() + // Initialization (note: do _not_ want to advance streamed here). This is made lazy to prevent + // unnecessary trigger of calculation. + private lazy val advancedBufferedIterRes = advancedBufferedToRowWithNullFreeJoinKey() --- End diff -- This function should be called (to try to set `BufferedRow`) before `BufferedRow` is checked, and it should be only once. This is the original requirement due to the logic. While to add this optimization, I think this is the best way. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...
Github user gczsjdy commented on a diff in the pull request: https://github.com/apache/spark/pull/19862#discussion_r154564327 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -699,39 +700,44 @@ private[joins] class SortMergeJoinScanner( matchJoinKey = null bufferedMatches.clear() false -} else if (matchJoinKey != null && keyOrdering.compare(streamedRowKey, matchJoinKey) == 0) { - // The new streamed row has the same join key as the previous row, so return the same matches. - true -} else if (bufferedRow == null) { - // The streamed row's join key does not match the current batch of buffered rows and there are - // no more rows to read from the buffered iterator, so there can be no more matches. - matchJoinKey = null - bufferedMatches.clear() - false } else { - // Advance both the streamed and buffered iterators to find the next pair of matching rows. - var comp = keyOrdering.compare(streamedRowKey, bufferedRowKey) - do { -if (streamedRowKey.anyNull) { - advancedStreamed() -} else { - assert(!bufferedRowKey.anyNull) - comp = keyOrdering.compare(streamedRowKey, bufferedRowKey) - if (comp > 0) advancedBufferedToRowWithNullFreeJoinKey() - else if (comp < 0) advancedStreamed() -} - } while (streamedRow != null && bufferedRow != null && comp != 0) - if (streamedRow == null || bufferedRow == null) { -// We have either hit the end of one of the iterators, so there can be no more matches. + // To make sure vars like bufferedRow is set + advancedBufferedIterRes --- End diff -- This advance function is only called once actually, so no `bufferedRow` will be missed. Or maybe I didn't understand your meaning? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...
Github user gczsjdy commented on a diff in the pull request: https://github.com/apache/spark/pull/19862#discussion_r154564488 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -699,39 +700,44 @@ private[joins] class SortMergeJoinScanner( matchJoinKey = null bufferedMatches.clear() false -} else if (matchJoinKey != null && keyOrdering.compare(streamedRowKey, matchJoinKey) == 0) { - // The new streamed row has the same join key as the previous row, so return the same matches. - true -} else if (bufferedRow == null) { - // The streamed row's join key does not match the current batch of buffered rows and there are - // no more rows to read from the buffered iterator, so there can be no more matches. - matchJoinKey = null - bufferedMatches.clear() - false } else { - // Advance both the streamed and buffered iterators to find the next pair of matching rows. - var comp = keyOrdering.compare(streamedRowKey, bufferedRowKey) - do { -if (streamedRowKey.anyNull) { - advancedStreamed() -} else { - assert(!bufferedRowKey.anyNull) - comp = keyOrdering.compare(streamedRowKey, bufferedRowKey) - if (comp > 0) advancedBufferedToRowWithNullFreeJoinKey() - else if (comp < 0) advancedStreamed() -} - } while (streamedRow != null && bufferedRow != null && comp != 0) - if (streamedRow == null || bufferedRow == null) { -// We have either hit the end of one of the iterators, so there can be no more matches. + // To make sure vars like bufferedRow is set + advancedBufferedIterRes + if (matchJoinKey != null && keyOrdering.compare(streamedRowKey, matchJoinKey) == 0) { --- End diff -- I agree with you. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154564066 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -251,6 +252,7 @@ object SparkSubmit extends CommandLineUtils with Logging { YARN case m if m.startsWith("spark") => STANDALONE case m if m.startsWith("mesos") => MESOS + case m if m.startsWith("k8s") => KUBERNETES case m if m.startsWith("local") => LOCAL case _ => printErrorAndExit("Master must either be yarn or start with spark, mesos, local") --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19871: [SPARK-20728][SQL] Make OrcFileFormat configurabl...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19871#discussion_r154563993 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -363,6 +363,11 @@ object SQLConf { .checkValues(Set("none", "uncompressed", "snappy", "zlib", "lzo")) .createWithDefault("snappy") + val ORC_ENABLED = buildConf("spark.sql.orc.enabled") +.doc("When true, use OrcFileFormat in sql/core module instead of the one in sql/hive module.") --- End diff -- Yep. I'll elaborate more. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19871: [SPARK-20728][SQL] Make OrcFileFormat configurabl...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19871#discussion_r154563902 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala --- @@ -54,11 +55,17 @@ class DDLSourceLoadSuite extends DataSourceTest with SharedSQLContext { .load().schema == StructType(Seq(StructField("stringType", StringType, nullable = false } - test("should fail to load ORC without Hive Support") { -val e = intercept[AnalysisException] { - spark.read.format("orc").load() + test("should fail to load ORC only if spark.sql.orc.enabled=false and without Hive Support") { --- End diff -- Ur, those tests cover different cases. - In this test: `true` -> Use new OrcFileFormat, `false` -> Throw Exception (the existing behavior) - In that test: `true` -> Use new OrcFileFormat, `false` -> Use old OrcFileFormat (the existing behavior). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19871: [SPARK-20728][SQL] Make OrcFileFormat configurabl...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19871#discussion_r154563532 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -568,8 +570,13 @@ object DataSource extends Logging { "org.apache.spark.Logging") /** Given a provider name, look up the data source class definition. */ - def lookupDataSource(provider: String): Class[_] = { -val provider1 = backwardCompatibilityMap.getOrElse(provider, provider) + def lookupDataSource(sparkSession: SparkSession, provider: String): Class[_] = { +var provider1 = backwardCompatibilityMap.getOrElse(provider, provider) +if (Seq("orc", "org.apache.spark.sql.hive.orc.OrcFileFormat").contains(provider1.toLowerCase) && --- End diff -- I see. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19871: [SPARK-20728][SQL] Make OrcFileFormat configurabl...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19871#discussion_r154563501 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -568,8 +570,13 @@ object DataSource extends Logging { "org.apache.spark.Logging") /** Given a provider name, look up the data source class definition. */ - def lookupDataSource(provider: String): Class[_] = { -val provider1 = backwardCompatibilityMap.getOrElse(provider, provider) + def lookupDataSource(sparkSession: SparkSession, provider: String): Class[_] = { --- End diff -- Yep. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19871: [SPARK-20728][SQL] Make OrcFileFormat configurabl...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19871#discussion_r154563142 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -363,6 +363,11 @@ object SQLConf { .checkValues(Set("none", "uncompressed", "snappy", "zlib", "lzo")) .createWithDefault("snappy") + val ORC_ENABLED = buildConf("spark.sql.orc.enabled") --- End diff -- Sure! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19855: [SPARK-22662] [SQL] Failed to prune columns after rewrit...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19855 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19855: [SPARK-22662] [SQL] Failed to prune columns after rewrit...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19855 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84407/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19855: [SPARK-22662] [SQL] Failed to prune columns after rewrit...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19855 **[Test build #84407 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84407/testReport)** for PR 19855 at commit [`f9e9d10`](https://github.com/apache/spark/commit/f9e9d10d4b581379c2af966a35362b77f35a7b6f). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19862#discussion_r154556774 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java --- @@ -159,6 +159,12 @@ public boolean hasNext() { @Override public UnsafeRow next() { try { +if (!alreadyCalculated) { + while (inputIterator.hasNext()) { +insertRow(inputIterator.next()); + } + alreadyCalculated = true; --- End diff -- We have cleaned up resources when we have an empty iterator at L144. We should still follow it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19862#discussion_r154558106 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -699,39 +700,44 @@ private[joins] class SortMergeJoinScanner( matchJoinKey = null bufferedMatches.clear() false -} else if (matchJoinKey != null && keyOrdering.compare(streamedRowKey, matchJoinKey) == 0) { - // The new streamed row has the same join key as the previous row, so return the same matches. - true -} else if (bufferedRow == null) { - // The streamed row's join key does not match the current batch of buffered rows and there are - // no more rows to read from the buffered iterator, so there can be no more matches. - matchJoinKey = null - bufferedMatches.clear() - false } else { - // Advance both the streamed and buffered iterators to find the next pair of matching rows. - var comp = keyOrdering.compare(streamedRowKey, bufferedRowKey) - do { -if (streamedRowKey.anyNull) { - advancedStreamed() -} else { - assert(!bufferedRowKey.anyNull) - comp = keyOrdering.compare(streamedRowKey, bufferedRowKey) - if (comp > 0) advancedBufferedToRowWithNullFreeJoinKey() - else if (comp < 0) advancedStreamed() -} - } while (streamedRow != null && bufferedRow != null && comp != 0) - if (streamedRow == null || bufferedRow == null) { -// We have either hit the end of one of the iterators, so there can be no more matches. + // To make sure vars like bufferedRow is set + advancedBufferedIterRes + if (matchJoinKey != null && keyOrdering.compare(streamedRowKey, matchJoinKey) == 0) { --- End diff -- This block can be excluded from this else block. It can be at the original position. We don't need to advance buffer rows too if this condition is hit. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19862#discussion_r154560155 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -699,39 +700,44 @@ private[joins] class SortMergeJoinScanner( matchJoinKey = null bufferedMatches.clear() false -} else if (matchJoinKey != null && keyOrdering.compare(streamedRowKey, matchJoinKey) == 0) { - // The new streamed row has the same join key as the previous row, so return the same matches. - true -} else if (bufferedRow == null) { - // The streamed row's join key does not match the current batch of buffered rows and there are - // no more rows to read from the buffered iterator, so there can be no more matches. - matchJoinKey = null - bufferedMatches.clear() - false } else { - // Advance both the streamed and buffered iterators to find the next pair of matching rows. - var comp = keyOrdering.compare(streamedRowKey, bufferedRowKey) - do { -if (streamedRowKey.anyNull) { - advancedStreamed() -} else { - assert(!bufferedRowKey.anyNull) - comp = keyOrdering.compare(streamedRowKey, bufferedRowKey) - if (comp > 0) advancedBufferedToRowWithNullFreeJoinKey() - else if (comp < 0) advancedStreamed() -} - } while (streamedRow != null && bufferedRow != null && comp != 0) - if (streamedRow == null || bufferedRow == null) { -// We have either hit the end of one of the iterators, so there can be no more matches. + // To make sure vars like bufferedRow is set + advancedBufferedIterRes --- End diff -- To advance buffer iterator here, won't we miss the `bufferedRow` advanced before? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17770: [SPARK-20392][SQL] Set barrier to prevent re-entering a ...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/17770 yea, a new PR sounds good, thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19862#discussion_r154560524 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -674,8 +674,9 @@ private[joins] class SortMergeJoinScanner( private[this] val bufferedMatches = new ExternalAppendOnlyUnsafeRowArray(inMemoryThreshold, spillThreshold) - // Initialization (note: do _not_ want to advance streamed here). - advancedBufferedToRowWithNullFreeJoinKey() + // Initialization (note: do _not_ want to advance streamed here). This is made lazy to prevent + // unnecessary trigger of calculation. + private lazy val advancedBufferedIterRes = advancedBufferedToRowWithNullFreeJoinKey() --- End diff -- Isn't it the same to simply call `advancedBufferedToRowWithNullFreeJoinKey` at needed places? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19862#discussion_r154560474 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -750,6 +756,8 @@ private[joins] class SortMergeJoinScanner( bufferedMatches.clear() false } else { + // To make sure vars like bufferedRow is set + advancedBufferedIterRes --- End diff -- ditto. We may miss the bufferedRow advanced before. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org