[GitHub] spark pull request #19767: [SPARK-22543][SQL] fix java 64kb compile error fo...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19767#discussion_r152489116 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -105,6 +105,36 @@ abstract class Expression extends TreeNode[Expression] { val isNull = ctx.freshName("isNull") val value = ctx.freshName("value") val ve = doGenCode(ctx, ExprCode("", isNull, value)) + + // TODO: support whole stage codegen too + if (ve.code.trim.length > 1024 && ctx.INPUT_ROW != null && ctx.currentVars == null) { --- End diff -- I think it won't work because of hitting other limitations, e.g. JVM constant pool. I'll try something bigger, like `100` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19779: [SPARK-17920][SPARK-19580][SPARK-19878][SQL] Support wri...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19779 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84094/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19779: [SPARK-17920][SPARK-19580][SPARK-19878][SQL] Support wri...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19779 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19779: [SPARK-17920][SPARK-19580][SPARK-19878][SQL] Support wri...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19779 **[Test build #84094 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84094/testReport)** for PR 19779 at commit [`e3651ef`](https://github.com/apache/spark/commit/e3651ef06d3ac3232b447df5d450632d8fde8ce2). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19082: [SPARK-21870][SQL] Split aggregation code into small fun...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/19082 ya, enjoy! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19082: [SPARK-21870][SQL] Split aggregation code into small fun...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19082 Will review it carefully after I finish my vacation. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19257: [SPARK-22042] [SQL] ReorderJoinPredicates can bre...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19257#discussion_r152484490 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala --- @@ -265,6 +268,7 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { if (childPartitioning.guarantees(partitioning)) child else operator case _ => operator } -case operator: SparkPlan => ensureDistributionAndOrdering(operator) +case operator: SparkPlan => + ensureDistributionAndOrdering(reorderJoinPredicates.apply(operator)) --- End diff -- Could you add a comment to explain why we do it here. It is hard for new comers to understand the assumptions we made here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19257: [SPARK-22042] [SQL] ReorderJoinPredicates can bre...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19257#discussion_r152484440 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala --- @@ -265,6 +268,7 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { if (childPartitioning.guarantees(partitioning)) child else operator case _ => operator } -case operator: SparkPlan => ensureDistributionAndOrdering(operator) +case operator: SparkPlan => + ensureDistributionAndOrdering(reorderJoinPredicates.apply(operator)) --- End diff -- Then, do something like ```Scala ensureDistributionAndOrdering(ReorderJoinPredicates(operator)) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19257: [SPARK-22042] [SQL] ReorderJoinPredicates can bre...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19257#discussion_r152484350 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala --- @@ -31,6 +32,8 @@ import org.apache.spark.sql.internal.SQLConf * input partition ordering requirements are met. */ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { + private val reorderJoinPredicates = new ReorderJoinPredicates --- End diff -- Change `class ReorderJoinPredicates ` to `object ReorderJoinPredicates `? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19651: [SPARK-20682][SPARK-15474][SPARK-21791] Add new O...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19651#discussion_r152482492 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala --- @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +import org.apache.hadoop.io._ +import org.apache.orc.mapred.{OrcList, OrcMap, OrcStruct, OrcTimestamp} +import org.apache.orc.storage.serde2.io.{DateWritable, HiveDecimalWritable} + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow +import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.execution.datasources.orc.OrcUtils.withNullSafe +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +private[orc] class OrcDeserializer( +dataSchema: StructType, +requiredSchema: StructType, +missingColumnNames: Seq[String]) { + + private[this] val mutableRow = new SpecificInternalRow(requiredSchema.map(_.dataType)) + + private[this] val length = requiredSchema.length + + private[this] val unwrappers = requiredSchema.map(_.dataType).map(unwrapperFor).toArray + + def deserialize(orcStruct: OrcStruct): InternalRow = { +var i = 0 +val names = orcStruct.getSchema.getFieldNames +while (i < length) { + val name = requiredSchema(i).name + val writable = if (missingColumnNames.contains(name)) { +null + } else { +if (names.contains(name)) { + orcStruct.getFieldValue(name) +} else { + orcStruct.getFieldValue("_col" + dataSchema.fieldIndex(name)) +} + } + if (writable == null) { +mutableRow.setNullAt(i) + } else { +unwrappers(i)(writable, mutableRow, i) + } + i += 1 +} +mutableRow + } + + private[this] def unwrapperFor(dataType: DataType): (Any, InternalRow, Int) => Unit = +dataType match { + case NullType => +(value: Any, row: InternalRow, ordinal: Int) => + row.setNullAt(ordinal) + + case BooleanType => +(value: Any, row: InternalRow, ordinal: Int) => + row.setBoolean(ordinal, value.asInstanceOf[BooleanWritable].get) + + case ByteType => +(value: Any, row: InternalRow, ordinal: Int) => + row.setByte(ordinal, value.asInstanceOf[ByteWritable].get) + + case ShortType => +(value: Any, row: InternalRow, ordinal: Int) => + row.setShort(ordinal, value.asInstanceOf[ShortWritable].get) + + case IntegerType => +(value: Any, row: InternalRow, ordinal: Int) => + row.setInt(ordinal, value.asInstanceOf[IntWritable].get) + + case LongType => +(value: Any, row: InternalRow, ordinal: Int) => + row.setLong(ordinal, value.asInstanceOf[LongWritable].get) + + case FloatType => +(value: Any, row: InternalRow, ordinal: Int) => + row.setFloat(ordinal, value.asInstanceOf[FloatWritable].get) + + case DoubleType => +(value: Any, row: InternalRow, ordinal: Int) => + row.setDouble(ordinal, value.asInstanceOf[DoubleWritable].get) + + case _ => +val unwrapper = getValueUnwrapper(dataType) +(value: Any, row: InternalRow, ordinal: Int) => + row(ordinal) = unwrapper(value) --- End diff -- And, for example, if the mappings look like the following, do we need to refactor some of the pattern between Parquet and ORC? - `ArrayDataUpdater` <= `ParquetArrayConverter` - `MapDataUpdater` <= `ParquetMapConverter` --- -
[GitHub] spark pull request #19752: [SPARK-22520][SQL] Support code generation for la...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19752#discussion_r152482401 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala --- @@ -211,111 +231,61 @@ abstract class CaseWhenBase( val elseCase = elseValue.map(" ELSE " + _.sql).getOrElse("") "CASE" + cases + elseCase + " END" } -} - - -/** - * Case statements of the form "CASE WHEN a THEN b [WHEN c THEN d]* [ELSE e] END". - * When a = true, returns b; when c = true, returns d; else returns e. - * - * @param branches seq of (branch condition, branch value) - * @param elseValue optional value for the else branch - */ -// scalastyle:off line.size.limit -@ExpressionDescription( - usage = "CASE WHEN expr1 THEN expr2 [WHEN expr3 THEN expr4]* [ELSE expr5] END - When `expr1` = true, returns `expr2`; else when `expr3` = true, returns `expr4`; else returns `expr5`.", - arguments = """ -Arguments: - * expr1, expr3 - the branch condition expressions should all be boolean type. - * expr2, expr4, expr5 - the branch value expressions and else value expression should all be - same type or coercible to a common type. - """, - examples = """ -Examples: - > SELECT CASE WHEN 1 > 0 THEN 1 WHEN 2 > 0 THEN 2.0 ELSE 1.2 END; - 1 - > SELECT CASE WHEN 1 < 0 THEN 1 WHEN 2 > 0 THEN 2.0 ELSE 1.2 END; - 2 - > SELECT CASE WHEN 1 < 0 THEN 1 WHEN 2 < 0 THEN 2.0 ELSE null END; - NULL - """) -// scalastyle:on line.size.limit -case class CaseWhen( -val branches: Seq[(Expression, Expression)], -val elseValue: Option[Expression] = None) - extends CaseWhenBase(branches, elseValue) with CodegenFallback with Serializable { - - override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { -super[CodegenFallback].doGenCode(ctx, ev) - } - - def toCodegen(): CaseWhenCodegen = { -CaseWhenCodegen(branches, elseValue) - } -} - -/** - * CaseWhen expression used when code generation condition is satisfied. - * OptimizeCodegen optimizer replaces CaseWhen into CaseWhenCodegen. - * - * @param branches seq of (branch condition, branch value) - * @param elseValue optional value for the else branch - */ -case class CaseWhenCodegen( -val branches: Seq[(Expression, Expression)], -val elseValue: Option[Expression] = None) - extends CaseWhenBase(branches, elseValue) with Serializable { override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { -// Generate code that looks like: -// -// condA = ... -// if (condA) { -// valueA -// } else { -// condB = ... -// if (condB) { -// valueB -// } else { -// condC = ... -// if (condC) { -// valueC -// } else { -// elseValue -// } -// } -// } +val conditionMet = ctx.freshName("caseWhenConditionMet") +ctx.addMutableState("boolean", ev.isNull, "") +ctx.addMutableState(ctx.javaType(dataType), ev.value, "") val cases = branches.map { case (condExpr, valueExpr) => val cond = condExpr.genCode(ctx) val res = valueExpr.genCode(ctx) s""" -${cond.code} -if (!${cond.isNull} && ${cond.value}) { - ${res.code} - ${ev.isNull} = ${res.isNull}; - ${ev.value} = ${res.value}; +if(!$conditionMet) { + ${cond.code} + if (!${cond.isNull} && ${cond.value}) { +${res.code} +${ev.isNull} = ${res.isNull}; +${ev.value} = ${res.value}; +$conditionMet = true; + } } """ } -var generatedCode = cases.mkString("", "\nelse {\n", "\nelse {\n") - -elseValue.foreach { elseExpr => +val elseCode = elseValue.map { elseExpr => val res = elseExpr.genCode(ctx) - generatedCode += -s""" + s""" +if(!$conditionMet) { ${res.code} ${ev.isNull} = ${res.isNull}; ${ev.value} = ${res.value}; -""" -} +} + """ +}.getOrElse("") -generatedCode += "}\n" * cases.size +val casesCode = if (ctx.INPUT_ROW == null || ctx.currentVars != null) { + cases.mkString("\n") +} else { + ctx.splitExpressions(cases, "caseWhen", --- End diff -- In almost all the cases, we do not need to call `splitExpressions` after merging the PR https://github.com/apache/spark
[GitHub] spark issue #19779: [SPARK-17920][SPARK-19580][SPARK-19878][SQL] Support wri...
Github user vinodkc commented on the issue: https://github.com/apache/spark/pull/19779 @gatorsmile , @cloud-fan and @dongjoon-hyun Thanks for the review comments and guidence Sure, I'll submit a separate PR for backporting it to 2.2 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19651: [SPARK-20682][SPARK-15474][SPARK-21791] Add new O...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19651#discussion_r152480591 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala --- @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +import org.apache.hadoop.io._ +import org.apache.orc.mapred.{OrcList, OrcMap, OrcStruct, OrcTimestamp} +import org.apache.orc.storage.serde2.io.{DateWritable, HiveDecimalWritable} + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow +import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.execution.datasources.orc.OrcUtils.withNullSafe +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +private[orc] class OrcDeserializer( +dataSchema: StructType, +requiredSchema: StructType, +missingColumnNames: Seq[String]) { + + private[this] val mutableRow = new SpecificInternalRow(requiredSchema.map(_.dataType)) + + private[this] val length = requiredSchema.length + + private[this] val unwrappers = requiredSchema.map(_.dataType).map(unwrapperFor).toArray + + def deserialize(orcStruct: OrcStruct): InternalRow = { +var i = 0 +val names = orcStruct.getSchema.getFieldNames +while (i < length) { + val name = requiredSchema(i).name + val writable = if (missingColumnNames.contains(name)) { +null + } else { +if (names.contains(name)) { + orcStruct.getFieldValue(name) +} else { + orcStruct.getFieldValue("_col" + dataSchema.fieldIndex(name)) +} + } + if (writable == null) { +mutableRow.setNullAt(i) + } else { +unwrappers(i)(writable, mutableRow, i) + } + i += 1 +} +mutableRow + } + + private[this] def unwrapperFor(dataType: DataType): (Any, InternalRow, Int) => Unit = +dataType match { + case NullType => +(value: Any, row: InternalRow, ordinal: Int) => + row.setNullAt(ordinal) + + case BooleanType => +(value: Any, row: InternalRow, ordinal: Int) => + row.setBoolean(ordinal, value.asInstanceOf[BooleanWritable].get) + + case ByteType => +(value: Any, row: InternalRow, ordinal: Int) => + row.setByte(ordinal, value.asInstanceOf[ByteWritable].get) + + case ShortType => +(value: Any, row: InternalRow, ordinal: Int) => + row.setShort(ordinal, value.asInstanceOf[ShortWritable].get) + + case IntegerType => +(value: Any, row: InternalRow, ordinal: Int) => + row.setInt(ordinal, value.asInstanceOf[IntWritable].get) + + case LongType => +(value: Any, row: InternalRow, ordinal: Int) => + row.setLong(ordinal, value.asInstanceOf[LongWritable].get) + + case FloatType => +(value: Any, row: InternalRow, ordinal: Int) => + row.setFloat(ordinal, value.asInstanceOf[FloatWritable].get) + + case DoubleType => +(value: Any, row: InternalRow, ordinal: Int) => + row.setDouble(ordinal, value.asInstanceOf[DoubleWritable].get) + + case _ => +val unwrapper = getValueUnwrapper(dataType) +(value: Any, row: InternalRow, ordinal: Int) => + row(ordinal) = unwrapper(value) --- End diff -- @cloud-fan . The current way is an old ORC way. Do we need to introduce the Parquet way for some performance reason? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For addi
[GitHub] spark issue #19792: [SPARK-22566][PYTHON] Better error message for `_merge_t...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19792 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19792: [SPARK-22566][PYTHON] Better error message for `_merge_t...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19792 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84095/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19792: [SPARK-22566][PYTHON] Better error message for `_merge_t...
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/19792 What if `ArrayType` or `MapType`, or deeply nested `StructType`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19792: [SPARK-22566][PYTHON] Better error message for `_merge_t...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19792 **[Test build #84095 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84095/testReport)** for PR 19792 at commit [`518fdd4`](https://github.com/apache/spark/commit/518fdd4f3d0e968cef2e3ba1b0220daee5ee7778). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19773: [SPARK-22546][SQL] Supporting for changing column...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19773#discussion_r152480007 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -318,16 +318,26 @@ case class AlterTableChangeColumnCommand( s"'${newColumn.name}' with type '${newColumn.dataType}'") } -val newSchema = table.schema.fields.map { field => +val typeChanged = originColumn.dataType != newColumn.dataType +val newDataSchema = table.dataSchema.fields.map { field => if (field.name == originColumn.name) { -// Create a new column from the origin column with the new comment. -addComment(field, newColumn.getComment) +// Add the comment to a column, if comment is empty, return the original column. +val newField = newColumn.getComment.map(field.withComment(_)).getOrElse(field) +if (typeChanged) { + newField.copy(dataType = newColumn.dataType) +} else { + newField +} } else { field } } -val newTable = table.copy(schema = StructType(newSchema)) -catalog.alterTable(newTable) +val newTable = table.copy(schema = StructType(newDataSchema ++ table.partitionSchema)) +if (typeChanged) { + catalog.alterTableDataSchema(tableName, StructType(newDataSchema)) --- End diff -- What is the Hive's behavior if users change the column type of partition schema? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19651: [SPARK-20682][SPARK-15474][SPARK-21791] Add new O...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19651#discussion_r152479902 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala --- @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +import org.apache.hadoop.io._ +import org.apache.orc.mapred.{OrcList, OrcMap, OrcStruct, OrcTimestamp} +import org.apache.orc.storage.serde2.io.{DateWritable, HiveDecimalWritable} + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow +import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.execution.datasources.orc.OrcUtils.withNullSafe +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +private[orc] class OrcDeserializer( +dataSchema: StructType, +requiredSchema: StructType, +missingColumnNames: Seq[String]) { + + private[this] val mutableRow = new SpecificInternalRow(requiredSchema.map(_.dataType)) + + private[this] val length = requiredSchema.length + + private[this] val unwrappers = requiredSchema.map(_.dataType).map(unwrapperFor).toArray + + def deserialize(orcStruct: OrcStruct): InternalRow = { +var i = 0 +val names = orcStruct.getSchema.getFieldNames +while (i < length) { + val name = requiredSchema(i).name + val writable = if (missingColumnNames.contains(name)) { +null + } else { +if (names.contains(name)) { --- End diff -- 1. I moved out the `column-name` logic from `while`, but it still requires `OrcStruct` because we don't have `StructObjectInspector`. So, we cannot move out this from `iter.map`. 2. This is about top-level fields. Could you tell me what do you mean by `nested fields` here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19651: [SPARK-20682][SPARK-15474][SPARK-21791] Add new ORCFileF...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19651 **[Test build #84097 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84097/testReport)** for PR 19651 at commit [`0666d30`](https://github.com/apache/spark/commit/0666d3046640519c3e2ee5eb0dfd56d3bcc08cba). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19779: [SPARK-17920][SPARK-19580][SPARK-19878][SQL] Supp...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19779 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - Basic Sc...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19468 **[Test build #84096 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84096/testReport)** for PR 19468 at commit [`3b587b4`](https://github.com/apache/spark/commit/3b587b4b4362f184b148c22522821ef7b163717e). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19779: [SPARK-17920][SPARK-19580][SPARK-19878][SQL] Support wri...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19779 Thanks! Merged to master. @vinodkc Could you submit a separate PR for backporting it to 2.2? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19792: [SPARK-22566][PYTHON] Better error message for `_...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/19792#discussion_r152478594 --- Diff: python/pyspark/sql/types.py --- @@ -1108,19 +1109,22 @@ def _has_nulltype(dt): return isinstance(dt, NullType) -def _merge_type(a, b): +def _merge_type(a, b, name=None): --- End diff -- When does this take the `name` parameter? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19767: [SPARK-22543][SQL] fix java 64kb compile error for deepl...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19767 LGTM except the above comments. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19767: [SPARK-22543][SQL] fix java 64kb compile error fo...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19767#discussion_r152476972 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -105,6 +105,36 @@ abstract class Expression extends TreeNode[Expression] { val isNull = ctx.freshName("isNull") val value = ctx.freshName("value") val ve = doGenCode(ctx, ExprCode("", isNull, value)) + + // TODO: support whole stage codegen too + if (ve.code.trim.length > 1024 && ctx.INPUT_ROW != null && ctx.currentVars == null) { --- End diff -- Could you change `1024` to `1`? Just to ensure whether all the tests can pass? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19767: [SPARK-22543][SQL] fix java 64kb compile error fo...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19767#discussion_r152470168 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -105,6 +105,36 @@ abstract class Expression extends TreeNode[Expression] { val isNull = ctx.freshName("isNull") val value = ctx.freshName("value") val ve = doGenCode(ctx, ExprCode("", isNull, value)) + + // TODO: support whole stage codegen too + if (ve.code.trim.length > 1024 && ctx.INPUT_ROW != null && ctx.currentVars == null) { +val setIsNull = if (ve.isNull != "false" && ve.isNull != "true") { + val globalIsNull = ctx.freshName("globalIsNull") + ctx.addMutableState("boolean", globalIsNull, s"$globalIsNull = false;") --- End diff -- -> `ctx.JAVA_BOOLEAN` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19767: [SPARK-22543][SQL] fix java 64kb compile error fo...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19767#discussion_r152477535 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -105,6 +105,36 @@ abstract class Expression extends TreeNode[Expression] { val isNull = ctx.freshName("isNull") val value = ctx.freshName("value") val ve = doGenCode(ctx, ExprCode("", isNull, value)) + + // TODO: support whole stage codegen too + if (ve.code.trim.length > 1024 && ctx.INPUT_ROW != null && ctx.currentVars == null) { +val setIsNull = if (ve.isNull != "false" && ve.isNull != "true") { + val globalIsNull = ctx.freshName("globalIsNull") + ctx.addMutableState("boolean", globalIsNull, s"$globalIsNull = false;") + val localIsNull = ve.isNull + ve.isNull = globalIsNull + s"$globalIsNull = $localIsNull;" +} else { + "" +} + +val javaType = ctx.javaType(dataType) +val newValue = ctx.freshName("value") + +val funcName = ctx.freshName(nodeName) +val funcFullName = ctx.addNewFunction(funcName, + s""" + |private $javaType $funcName(InternalRow ${ctx.INPUT_ROW}) { + | ${ve.code.trim} + | $setIsNull + | return ${ve.value}; + |} + """.stripMargin) + +ve.value = newValue +ve.code = s"$javaType $newValue = $funcFullName(${ctx.INPUT_ROW});" --- End diff -- Create a separate function for this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19792: [SPARK-22566][PYTHON] Better error message for `_merge_t...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19792 **[Test build #84095 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84095/testReport)** for PR 19792 at commit [`518fdd4`](https://github.com/apache/spark/commit/518fdd4f3d0e968cef2e3ba1b0220daee5ee7778). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19792: [SPARK-22566][PYTHON] Better error message for `_merge_t...
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/19792 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19779: [SPARK-17920][SPARK-19580][SPARK-19878][SQL] Support wri...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19779 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19779: [SPARK-17920][SPARK-19580][SPARK-19878][SQL] Support wri...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19779 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84092/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19779: [SPARK-17920][SPARK-19580][SPARK-19878][SQL] Support wri...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19779 **[Test build #84092 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84092/testReport)** for PR 19779 at commit [`51999d0`](https://github.com/apache/spark/commit/51999d01a4bd24be46c2f900a583e0b149ce6e2f). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19779: [SPARK-17920][SPARK-19580][SPARK-19878][SQL] Support wri...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19779 **[Test build #84094 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84094/testReport)** for PR 19779 at commit [`e3651ef`](https://github.com/apache/spark/commit/e3651ef06d3ac3232b447df5d450632d8fde8ce2). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19779: [SPARK-17920][SPARK-19580][SPARK-19878][SQL] Supp...
Github user vinodkc commented on a diff in the pull request: https://github.com/apache/spark/pull/19779#discussion_r152474528 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala --- @@ -841,6 +841,76 @@ class VersionsSuite extends SparkFunSuite with Logging { } } +test(s"$version: SPARK-17920: Insert into/overwrite avro table") { + withTempDir { dir => +val path = dir.getAbsolutePath +val schemaPath = s"""$path${File.separator}avroschemadir""" + +new File(schemaPath).mkdir() +val avroSchema = + """{ +| "name": "test_record", +| "type": "record", +| "fields": [ { +|"name": "f0", +|"type": [ +| "null", +| { +|"precision": 38, +|"scale": 2, +|"type": "bytes", +|"logicalType": "decimal" +| } +|] +| } ] +|} + """.stripMargin +val schemaUrl = s"""$schemaPath${File.separator}avroDecimal.avsc""" +val schemaFile = new File(schemaPath, "avroDecimal.avsc") +val writer = new PrintWriter(schemaFile) +writer.write(avroSchema) +writer.close() + +val url = Thread.currentThread().getContextClassLoader.getResource("avroDecimal") +val srcLocation = new File(url.getFile) +val destTableName = "tab1" +val srcTableName = "tab2" + +withTable(srcTableName, destTableName) { + versionSpark.sql( +s""" + |CREATE EXTERNAL TABLE $srcTableName + |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' + |WITH SERDEPROPERTIES ('respectSparkSchema' = 'true') + |STORED AS + | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' + | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' + |LOCATION '$srcLocation' + |TBLPROPERTIES ('avro.schema.url' = '$schemaUrl') + """.stripMargin + ) + + versionSpark.sql( +s""" + |CREATE TABLE $destTableName + |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' + |WITH SERDEPROPERTIES ('respectSparkSchema' = 'true') + |STORED AS + | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' + | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' + |TBLPROPERTIES ('avro.schema.url' = '$schemaUrl') + """.stripMargin + ) + versionSpark.sql( +s"""INSERT OVERWRITE TABLE $destTableName SELECT * FROM $srcTableName""".stripMargin) + val result = versionSpark.table(srcTableName).collect() + assert(versionSpark.table(destTableName).collect() === result) + versionSpark.sql( +s"""INSERT INTO TABLE $destTableName SELECT * FROM $srcTableName""".stripMargin) --- End diff -- Updated --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19779: [SPARK-17920][SPARK-19580][SPARK-19878][SQL] Supp...
Github user vinodkc commented on a diff in the pull request: https://github.com/apache/spark/pull/19779#discussion_r152473900 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala --- @@ -841,6 +841,76 @@ class VersionsSuite extends SparkFunSuite with Logging { } } +test(s"$version: SPARK-17920: Insert into/overwrite avro table") { + withTempDir { dir => +val path = dir.getAbsolutePath +val schemaPath = s"""$path${File.separator}avroschemadir""" + +new File(schemaPath).mkdir() +val avroSchema = + """{ +| "name": "test_record", +| "type": "record", +| "fields": [ { +|"name": "f0", +|"type": [ +| "null", +| { +|"precision": 38, +|"scale": 2, +|"type": "bytes", +|"logicalType": "decimal" +| } +|] +| } ] +|} + """.stripMargin +val schemaUrl = s"""$schemaPath${File.separator}avroDecimal.avsc""" +val schemaFile = new File(schemaPath, "avroDecimal.avsc") +val writer = new PrintWriter(schemaFile) +writer.write(avroSchema) +writer.close() + +val url = Thread.currentThread().getContextClassLoader.getResource("avroDecimal") +val srcLocation = new File(url.getFile) +val destTableName = "tab1" +val srcTableName = "tab2" + +withTable(srcTableName, destTableName) { + versionSpark.sql( +s""" + |CREATE EXTERNAL TABLE $srcTableName + |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' + |WITH SERDEPROPERTIES ('respectSparkSchema' = 'true') + |STORED AS + | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' + | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' + |LOCATION '$srcLocation' + |TBLPROPERTIES ('avro.schema.url' = '$schemaUrl') + """.stripMargin + ) + + versionSpark.sql( +s""" + |CREATE TABLE $destTableName + |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' + |WITH SERDEPROPERTIES ('respectSparkSchema' = 'true') + |STORED AS + | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' + | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' + |TBLPROPERTIES ('avro.schema.url' = '$schemaUrl') + """.stripMargin + ) + versionSpark.sql( +s"""INSERT OVERWRITE TABLE $destTableName SELECT * FROM $srcTableName""".stripMargin) --- End diff -- Sure, I'll update it --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19779: [SPARK-17920][SPARK-19580][SPARK-19878][SQL] Supp...
Github user vinodkc commented on a diff in the pull request: https://github.com/apache/spark/pull/19779#discussion_r152473845 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala --- @@ -841,6 +841,76 @@ class VersionsSuite extends SparkFunSuite with Logging { } } +test(s"$version: SPARK-17920: Insert into/overwrite avro table") { + withTempDir { dir => +val path = dir.getAbsolutePath +val schemaPath = s"""$path${File.separator}avroschemadir""" + +new File(schemaPath).mkdir() +val avroSchema = + """{ +| "name": "test_record", +| "type": "record", +| "fields": [ { +|"name": "f0", +|"type": [ +| "null", +| { +|"precision": 38, +|"scale": 2, +|"type": "bytes", +|"logicalType": "decimal" +| } +|] +| } ] +|} + """.stripMargin +val schemaUrl = s"""$schemaPath${File.separator}avroDecimal.avsc""" +val schemaFile = new File(schemaPath, "avroDecimal.avsc") +val writer = new PrintWriter(schemaFile) +writer.write(avroSchema) +writer.close() + +val url = Thread.currentThread().getContextClassLoader.getResource("avroDecimal") +val srcLocation = new File(url.getFile) +val destTableName = "tab1" +val srcTableName = "tab2" + +withTable(srcTableName, destTableName) { + versionSpark.sql( +s""" + |CREATE EXTERNAL TABLE $srcTableName + |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' + |WITH SERDEPROPERTIES ('respectSparkSchema' = 'true') + |STORED AS + | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' + | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' + |LOCATION '$srcLocation' + |TBLPROPERTIES ('avro.schema.url' = '$schemaUrl') + """.stripMargin + ) + + versionSpark.sql( +s""" + |CREATE TABLE $destTableName + |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' + |WITH SERDEPROPERTIES ('respectSparkSchema' = 'true') + |STORED AS + | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' + | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' + |TBLPROPERTIES ('avro.schema.url' = '$schemaUrl') + """.stripMargin + ) + versionSpark.sql( +s"""INSERT OVERWRITE TABLE $destTableName SELECT * FROM $srcTableName""".stripMargin) --- End diff -- @gatorsmile , I tried to remove 'stripMargin', but getting org.apache.spark.sql.catalyst.parser.ParseException: extraneous input '|' expecting {'(', 'SELECT', 'FROM', 'ADD',..} --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19781: [SPARK-22445][SQL][FOLLOW-UP] Respect stream-side child'...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19781 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19781: [SPARK-22445][SQL][FOLLOW-UP] Respect stream-side child'...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19781 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84091/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19781: [SPARK-22445][SQL][FOLLOW-UP] Respect stream-side child'...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19781 **[Test build #84091 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84091/testReport)** for PR 19781 at commit [`d2b149b`](https://github.com/apache/spark/commit/d2b149bb392e7fd38b734b9ae120c92b9f0ece48). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19756: [SPARK-22527][SQL] Reuse coordinated exchanges if possib...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/19756 ping @cloud-fan Please take a look. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19621: [SPARK-11215][ML] Add multiple columns support to String...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19621 **[Test build #84093 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84093/testReport)** for PR 19621 at commit [`031f53f`](https://github.com/apache/spark/commit/031f53fbd1c112d8f0b37bb29e847cd3184498c6). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19518: [SPARK-18016][SQL][CATALYST] Code Generation: Constant P...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/19518 @cloud-fan Is it better to use this PR? Or, create a new PR? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19779: [SPARK-17920][SPARK-19580][SPARK-19878][SQL] Support wri...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19779 cc @felixcheung This sounds critical for Spark 2.2 too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19779: [SPARK-17920][SPARK-19580][SPARK-19878][SQL] Support wri...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19779 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19779: [SPARK-17920][SPARK-19580][SPARK-19878][SQL] Supp...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19779#discussion_r152465384 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala --- @@ -841,6 +841,76 @@ class VersionsSuite extends SparkFunSuite with Logging { } } +test(s"$version: SPARK-17920: Insert into/overwrite avro table") { + withTempDir { dir => +val path = dir.getAbsolutePath +val schemaPath = s"""$path${File.separator}avroschemadir""" + +new File(schemaPath).mkdir() +val avroSchema = + """{ +| "name": "test_record", +| "type": "record", +| "fields": [ { +|"name": "f0", +|"type": [ +| "null", +| { +|"precision": 38, +|"scale": 2, +|"type": "bytes", +|"logicalType": "decimal" +| } +|] +| } ] +|} + """.stripMargin +val schemaUrl = s"""$schemaPath${File.separator}avroDecimal.avsc""" +val schemaFile = new File(schemaPath, "avroDecimal.avsc") +val writer = new PrintWriter(schemaFile) +writer.write(avroSchema) +writer.close() + +val url = Thread.currentThread().getContextClassLoader.getResource("avroDecimal") +val srcLocation = new File(url.getFile) +val destTableName = "tab1" +val srcTableName = "tab2" + +withTable(srcTableName, destTableName) { + versionSpark.sql( +s""" + |CREATE EXTERNAL TABLE $srcTableName + |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' + |WITH SERDEPROPERTIES ('respectSparkSchema' = 'true') + |STORED AS + | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' + | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' + |LOCATION '$srcLocation' + |TBLPROPERTIES ('avro.schema.url' = '$schemaUrl') + """.stripMargin + ) + + versionSpark.sql( +s""" + |CREATE TABLE $destTableName + |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' + |WITH SERDEPROPERTIES ('respectSparkSchema' = 'true') + |STORED AS + | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' + | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' + |TBLPROPERTIES ('avro.schema.url' = '$schemaUrl') + """.stripMargin + ) + versionSpark.sql( +s"""INSERT OVERWRITE TABLE $destTableName SELECT * FROM $srcTableName""".stripMargin) + val result = versionSpark.table(srcTableName).collect() + assert(versionSpark.table(destTableName).collect() === result) + versionSpark.sql( +s"""INSERT INTO TABLE $destTableName SELECT * FROM $srcTableName""".stripMargin) --- End diff -- The same here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19779: [SPARK-17920][SPARK-19580][SPARK-19878][SQL] Supp...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19779#discussion_r152465374 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala --- @@ -841,6 +841,76 @@ class VersionsSuite extends SparkFunSuite with Logging { } } +test(s"$version: SPARK-17920: Insert into/overwrite avro table") { + withTempDir { dir => +val path = dir.getAbsolutePath +val schemaPath = s"""$path${File.separator}avroschemadir""" + +new File(schemaPath).mkdir() +val avroSchema = + """{ +| "name": "test_record", +| "type": "record", +| "fields": [ { +|"name": "f0", +|"type": [ +| "null", +| { +|"precision": 38, +|"scale": 2, +|"type": "bytes", +|"logicalType": "decimal" +| } +|] +| } ] +|} + """.stripMargin +val schemaUrl = s"""$schemaPath${File.separator}avroDecimal.avsc""" +val schemaFile = new File(schemaPath, "avroDecimal.avsc") +val writer = new PrintWriter(schemaFile) +writer.write(avroSchema) +writer.close() + +val url = Thread.currentThread().getContextClassLoader.getResource("avroDecimal") +val srcLocation = new File(url.getFile) +val destTableName = "tab1" +val srcTableName = "tab2" + +withTable(srcTableName, destTableName) { + versionSpark.sql( +s""" + |CREATE EXTERNAL TABLE $srcTableName + |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' + |WITH SERDEPROPERTIES ('respectSparkSchema' = 'true') + |STORED AS + | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' + | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' + |LOCATION '$srcLocation' + |TBLPROPERTIES ('avro.schema.url' = '$schemaUrl') + """.stripMargin + ) + + versionSpark.sql( +s""" + |CREATE TABLE $destTableName + |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' + |WITH SERDEPROPERTIES ('respectSparkSchema' = 'true') + |STORED AS + | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' + | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' + |TBLPROPERTIES ('avro.schema.url' = '$schemaUrl') + """.stripMargin + ) + versionSpark.sql( +s"""INSERT OVERWRITE TABLE $destTableName SELECT * FROM $srcTableName""".stripMargin) --- End diff -- `stripMargin ` is useless --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19791: [SPARK-22572] [Spark shell] spark-shell does not ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19791#discussion_r152464410 --- Diff: repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala --- @@ -217,4 +218,13 @@ class ReplSuite extends SparkFunSuite { assertDoesNotContain("error:", output) assertDoesNotContain("Exception", output) } + + test(":replay should work correctly") { + runInterpreter("local", + """ + |sc + |:replay + """.stripMargin) should not include "error: not found: value sc" --- End diff -- Could we just use assert... instead? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19779: [SPARK-17920][SPARK-19580][SPARK-19878][SQL] Support wri...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19779 **[Test build #84092 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84092/testReport)** for PR 19779 at commit [`51999d0`](https://github.com/apache/spark/commit/51999d01a4bd24be46c2f900a583e0b149ce6e2f). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19779: [SPARK-17920][SPARK-19580][SPARK-19878][SQL] Supp...
Github user vinodkc commented on a diff in the pull request: https://github.com/apache/spark/pull/19779#discussion_r152464029 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala --- @@ -841,6 +841,75 @@ class VersionsSuite extends SparkFunSuite with Logging { } } +test(s"$version: SPARK-17920: Insert into/overwrite external avro table") { + withTempDir { dir => +val path = dir.getAbsolutePath +val schemaPath = s"""$path${File.separator}avroschemadir""" + +new File(schemaPath).mkdir() +val avroSchema = + """{ +| "name": "test_record", +| "type": "record", +| "fields": [ { +|"name": "f0", +|"type": [ +| "null", +| { +|"precision": 38, +|"scale": 2, +|"type": "bytes", +|"logicalType": "decimal" +| } +|] +| } ] +|} + """.stripMargin +val schemaurl = s"""$schemaPath${File.separator}avroDecimal.avsc""" +new java.io.PrintWriter(schemaurl) { write(avroSchema); close() } +val url = Thread.currentThread().getContextClassLoader.getResource("avroDecimal") +val srcLocation = new File(url.getFile) +val destTableName = "tab1" +val srcTableName = "tab2" + +withTable(srcTableName, destTableName) { + versionSpark.sql( +s""" + |CREATE TABLE $srcTableName + |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' + |WITH SERDEPROPERTIES ('respectSparkSchema' = 'true') + |STORED AS + | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' + | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' + |LOCATION '$srcLocation' + |TBLPROPERTIES ('avro.schema.url' = '$schemaurl') + """.stripMargin + ) + val destLocation = s"""$path${File.separator}destTableLocation""" + new File(destLocation).mkdir() + + versionSpark.sql( +s""" + |CREATE TABLE $destTableName + |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' + |WITH SERDEPROPERTIES ('respectSparkSchema' = 'true') + |STORED AS + | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' + | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' + |LOCATION '$destLocation' --- End diff -- Thanks, I've updated the test case to test only managed tables and avoided creating a temp directory. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - Basic Sc...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19468 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84088/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - Basic Sc...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19468 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - Basic Sc...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19468 **[Test build #84088 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84088/testReport)** for PR 19468 at commit [`b75b413`](https://github.com/apache/spark/commit/b75b4136352d4606a41ce2b3fe1c7e31fdf71ffc). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19779: [SPARK-17920][SPARK-19580][SPARK-19878][SQL] Support wri...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19779 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84090/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19776: [SPARK-22548][SQL] Incorrect nested AND expression pushe...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19776 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84087/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19776: [SPARK-22548][SQL] Incorrect nested AND expression pushe...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19776 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19779: [SPARK-17920][SPARK-19580][SPARK-19878][SQL] Support wri...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19779 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19776: [SPARK-22548][SQL] Incorrect nested AND expression pushe...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19776 **[Test build #84087 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84087/testReport)** for PR 19776 at commit [`7a19ac6`](https://github.com/apache/spark/commit/7a19ac63fcdae6b67ff989ca90d4a3652c7d02f3). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19779: [SPARK-17920][SPARK-19580][SPARK-19878][SQL] Support wri...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19779 **[Test build #84090 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84090/testReport)** for PR 19779 at commit [`083e1b3`](https://github.com/apache/spark/commit/083e1b39d733e1b3a1c47a7a87c74b9dfa8444e7). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19746: [SPARK-22346][ML] VectorSizeHint Transformer for using V...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19746 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19746: [SPARK-22346][ML] VectorSizeHint Transformer for using V...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19746 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84089/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19746: [SPARK-22346][ML] VectorSizeHint Transformer for using V...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19746 **[Test build #84089 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84089/testReport)** for PR 19746 at commit [`2b1ed0a`](https://github.com/apache/spark/commit/2b1ed0a3a85385f9b4042415889335942b65b9c9). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19468#discussion_r152458539 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model._ + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.ConfigurationUtils +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.util.Utils + +/** + * A factory class for configuring and creating executor pods. + */ +private[spark] trait ExecutorPodFactory { + + /** + * Configure and construct an executor pod with the given parameters. + */ + def createExecutorPod( + executorId: String, + applicationId: String, + driverUrl: String, + executorEnvs: Seq[(String, String)], + driverPod: Pod, + nodeToLocalTaskCount: Map[String, Int]): Pod +} + +private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf) + extends ExecutorPodFactory { + + import ExecutorPodFactoryImpl._ + + private val executorExtraClasspath = +sparkConf.get(org.apache.spark.internal.config.EXECUTOR_CLASS_PATH) + + private val executorLabels = ConfigurationUtils.parsePrefixedKeyValuePairs( +sparkConf, +KUBERNETES_EXECUTOR_LABEL_PREFIX, +"executor label") + require( +!executorLabels.contains(SPARK_APP_ID_LABEL), +s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is reserved for Spark.") + require( +!executorLabels.contains(SPARK_EXECUTOR_ID_LABEL), +s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it is reserved for" + + " Spark.") + require( +!executorLabels.contains(SPARK_ROLE_LABEL), +s"Custom executor labels cannot contain $SPARK_ROLE_LABEL as it is reserved for Spark.") + + private val executorAnnotations = +ConfigurationUtils.parsePrefixedKeyValuePairs( + sparkConf, + KUBERNETES_EXECUTOR_ANNOTATION_PREFIX, + "executor annotation") + private val nodeSelector = +ConfigurationUtils.parsePrefixedKeyValuePairs( + sparkConf, + KUBERNETES_NODE_SELECTOR_PREFIX, + "node selector") + + private val executorDockerImage = sparkConf.get(EXECUTOR_DOCKER_IMAGE) + private val dockerImagePullPolicy = sparkConf.get(DOCKER_IMAGE_PULL_POLICY) + private val executorPort = sparkConf.getInt("spark.executor.port", DEFAULT_STATIC_PORT) + private val blockManagerPort = sparkConf +.getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT) + + private val executorPodNamePrefix = sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX) + + private val executorMemoryMiB = sparkConf.get(org.apache.spark.internal.config.EXECUTOR_MEMORY) + private val executorMemoryString = sparkConf.get( +org.apache.spark.internal.config.EXECUTOR_MEMORY.key, +org.apache.spark.internal.config.EXECUTOR_MEMORY.defaultValueString) --- End diff -- We are already translating it to k8s specific setting at line 117. For reference: ``` val executorMemoryQuantity = new QuantityBuilder(false) .withAmount(s"${executorMemoryMiB}Mi") ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19468#discussion_r152458560 --- Diff: resource-managers/kubernetes/core/src/test/resources/log4j.properties --- @@ -0,0 +1,31 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the file target/unit-tests.log +log4j.rootCategory=INFO, file +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.append=true +log4j.appender.file.file=target/unit-tests.log +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n + +# Ignore messages below warning level from a few verbose libraries. +log4j.logger.com.sun.jersey=WARN +log4j.logger.org.apache.hadoop=WARN +log4j.logger.org.eclipse.jetty=WARN +log4j.logger.org.mortbay=WARN +log4j.logger.org.spark_project.jetty=WARN --- End diff -- Should we given this is for the unit tests? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19468#discussion_r152458551 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/ConfigurationUtils.scala --- @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.k8s + +import org.apache.spark.SparkConf + +private[spark] object ConfigurationUtils { + + /** + * Extract and parse Spark configuration properties with a given name prefix and + * return the result as a Map. Keys must not have more than one value. + * + * @param sparkConf Spark configuration + * @param prefix the given property name prefix + * @param configType a descriptive note on the type of entities of interest + * @return a Map storing the configuration property keys and values + */ + def parsePrefixedKeyValuePairs( + sparkConf: SparkConf, + prefix: String, + configType: String): Map[String, String] = { --- End diff -- We are not really using it in the context of this PR. Removed this parameter. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19781: [SPARK-22445][SQL][FOLLOW-UP] Respect stream-side...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/19781#discussion_r152455868 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala --- @@ -857,4 +857,29 @@ class JoinSuite extends QueryTest with SharedSQLContext { joinQueries.foreach(assertJoinOrdering) } + + test("SPARK-22445 Respect stream-side child's needCopyResult in BroadcastHashJoin") { +val df1 = Seq((2, 3), (2, 5), (2, 2), (3, 8), (2, 1)).toDF("k", "v1") +val df2 = Seq((2, 8), (3, 7), (3, 4), (1, 2)).toDF("k", "v2") +val df3 = Seq((1, 1), (3, 2), (4, 3), (5, 1)).toDF("k", "v3") + +withSQLConf( +SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", +SQLConf.JOIN_REORDER_ENABLED.key -> "false") { + val df = df1.join(df2, "k").join(functions.broadcast(df3), "k") + val plan = df.queryExecution.sparkPlan + + // Check if `needCopyResult` in `BroadcastHashJoin` is correct when smj->bhj --- End diff -- `q6` also failed when smj->bhj --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19781: [SPARK-22445][SQL][FOLLOW-UP] Respect children's needCop...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19781 **[Test build #84091 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84091/testReport)** for PR 19781 at commit [`d2b149b`](https://github.com/apache/spark/commit/d2b149bb392e7fd38b734b9ae120c92b9f0ece48). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19767: [SPARK-22543][SQL] fix java 64kb compile error for deepl...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19767 will review it tonight. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19588: [SPARK-12375][ML] VectorIndexerModel support hand...
Github user WeichenXu123 commented on a diff in the pull request: https://github.com/apache/spark/pull/19588#discussion_r152454669 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala --- @@ -311,22 +342,39 @@ class VectorIndexerModel private[ml] ( // TODO: Check more carefully about whether this whole class will be included in a closure. /** Per-vector transform function */ - private val transformFunc: Vector => Vector = { + private lazy val transformFunc: Vector => Vector = { --- End diff -- @MrBago Use `lazy val` will generate the closure only once. If use `def`, each time call `transformFunc` will generate the closure again. What you concern is when user setParams for this Model, then the param won't work. But, I think this is not a issue, because `Model` do not allow user to set params, the params are copied from estimator, isn't it ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19786: [SPARK-22559][CORE]history server: handle exception on o...
Github user gengliangwang commented on the issue: https://github.com/apache/spark/pull/19786 retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19776: [SPARK-22548][SQL] Incorrect nested AND expression pushe...
Github user jliwork commented on the issue: https://github.com/apache/spark/pull/19776 @gatorsmile @cloud-fan @viirya @HyukjinKwon Thanks a lot! =) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/19468#discussion_r152450525 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model._ + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.ConfigurationUtils +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.util.Utils + +/** + * A factory class for configuring and creating executor pods. + */ +private[spark] trait ExecutorPodFactory { + + /** + * Configure and construct an executor pod with the given parameters. + */ + def createExecutorPod( + executorId: String, + applicationId: String, + driverUrl: String, + executorEnvs: Seq[(String, String)], + driverPod: Pod, + nodeToLocalTaskCount: Map[String, Int]): Pod +} + +private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf) + extends ExecutorPodFactory { + + import ExecutorPodFactoryImpl._ + + private val executorExtraClasspath = +sparkConf.get(org.apache.spark.internal.config.EXECUTOR_CLASS_PATH) + + private val executorLabels = ConfigurationUtils.parsePrefixedKeyValuePairs( +sparkConf, +KUBERNETES_EXECUTOR_LABEL_PREFIX, +"executor label") + require( +!executorLabels.contains(SPARK_APP_ID_LABEL), +s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is reserved for Spark.") + require( +!executorLabels.contains(SPARK_EXECUTOR_ID_LABEL), +s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it is reserved for" + + " Spark.") + require( +!executorLabels.contains(SPARK_ROLE_LABEL), +s"Custom executor labels cannot contain $SPARK_ROLE_LABEL as it is reserved for Spark.") + + private val executorAnnotations = +ConfigurationUtils.parsePrefixedKeyValuePairs( + sparkConf, + KUBERNETES_EXECUTOR_ANNOTATION_PREFIX, + "executor annotation") + private val nodeSelector = +ConfigurationUtils.parsePrefixedKeyValuePairs( + sparkConf, + KUBERNETES_NODE_SELECTOR_PREFIX, + "node selector") + + private val executorDockerImage = sparkConf.get(EXECUTOR_DOCKER_IMAGE) + private val dockerImagePullPolicy = sparkConf.get(DOCKER_IMAGE_PULL_POLICY) + private val executorPort = sparkConf.getInt("spark.executor.port", DEFAULT_STATIC_PORT) + private val blockManagerPort = sparkConf +.getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT) + + private val executorPodNamePrefix = sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX) + + private val executorMemoryMiB = sparkConf.get(org.apache.spark.internal.config.EXECUTOR_MEMORY) + private val executorMemoryString = sparkConf.get( +org.apache.spark.internal.config.EXECUTOR_MEMORY.key, +org.apache.spark.internal.config.EXECUTOR_MEMORY.defaultValueString) --- End diff -- Maybe update the doc of this configuration to describe the difference while using Kubernetes backend? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19370: [SPARK-22495] Fix setup of SPARK_HOME variable on Window...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/19370 In `if not exist "%FIND_SPARK_HOME_PYTHON_SCRIPT%" (`, I switched the condition to be matched with https://github.com/apache/spark/blob/a36a76ac43c36a3b897a748bd9f138b629dbc684/bin/find-spark-home#L27 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19370: [SPARK-22495] Fix setup of SPARK_HOME variable on Window...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/19370 I suggest this: ```cmd rem Path to Python script finding SPARK_HOME set FIND_SPARK_HOME_PYTHON_SCRIPT=%~dp0find_spark_home.py rem Default to standard python interpreter unless told otherwise set PYTHON_RUNNER=python rem If PYSPARK_DRIVER_PYTHON is set, it overwrites the python version if not "x%PYSPARK_DRIVER_PYTHON%" =="x" ( set PYTHON_RUNNER=%PYSPARK_DRIVER_PYTHON% ) rem If PYSPARK_PYTHON is set, it overwrites the python version if not "x%PYSPARK_PYTHON%" =="x" ( set PYTHON_RUNNER=%PYSPARK_PYTHON% ) rem If there is python installed, trying to use the root dir as SPARK_HOME where %PYTHON_RUNNER% > nul 2>$1 if %ERRORLEVEL% neq 0 ( if not exist %PYTHON_RUNNER% ( echo Missing Python executable '%PYTHON_RUNNER%', defaulting to '%~dp0..' for SPARK_HOME. ^ Please install Python or specify the correct Python executable for PYSPARK_DRIVER_PYTHON ^ or PYSPARK_PYTHON. if "x%SPARK_HOME%"=="x" ( set SPARK_HOME=%~dp0.. ) ) ) rem Only attempt to find SPARK_HOME if it is not set. if "x%SPARK_HOME%"=="x" ( rem We are pip installed, use the Python script to resolve a reasonable SPARK_HOME if not exist "%FIND_SPARK_HOME_PYTHON_SCRIPT%" ( rem If we are not in the same directory as find_spark_home.py we are not pip installed so we don't rem need to search the different Python directories for a Spark installation. rem Note only that, if the user has pip installed PySpark but is directly calling pyspark-shell or rem spark-submit in another directory we want to use that version of PySpark rather than the rem pip installed version of PySpark. set SPARK_HOME=%~dp0.. ) else ( rem If there is no python installed it will fail with message: rem 'python' is not recognized as an internal or external command, for /f "delims=" %%i in ('%PYTHON_RUNNER% %FIND_SPARK_HOME_PYTHON_SCRIPT%') do set SPARK_HOME=%%i ) ) ``` I manually tested each branch. This address the concern in https://github.com/apache/spark/pull/19370#discussion_r152449365. The error message shows like: ```cmd C:\...>pyspark ``` ``` Missing Python executable 'C:\foo\bar.exe', defaulting to 'C:\Python27\Scripts\. .' for SPARK_HOME. Please install Python or specify the correct Python executabl e for PYSPARK_DRIVER_PYTHON or PYSPARK_PYTHON. ``` ```cmd C:\...>pyspark ``` ``` Missing Python executable 'bar', defaulting to 'C:\Python27\Scripts\..' for SPAR K_HOME. Please install Python or specify the correct Python executable for PYSPA RK_DRIVER_PYTHON or PYSPARK_PYTHON. ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19370: [SPARK-22495] Fix setup of SPARK_HOME variable on...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19370#discussion_r152449365 --- Diff: bin/find-spark-home.cmd --- @@ -0,0 +1,56 @@ +@echo off + +rem +rem Licensed to the Apache Software Foundation (ASF) under one or more +rem contributor license agreements. See the NOTICE file distributed with +rem this work for additional information regarding copyright ownership. +rem The ASF licenses this file to You under the Apache License, Version 2.0 +rem (the "License"); you may not use this file except in compliance with +rem the License. You may obtain a copy of the License at +rem +remhttp://www.apache.org/licenses/LICENSE-2.0 +rem +rem Unless required by applicable law or agreed to in writing, software +rem distributed under the License is distributed on an "AS IS" BASIS, +rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +rem See the License for the specific language governing permissions and +rem limitations under the License. +rem + +rem Path to Python script finding SPARK_HOME +set FIND_SPARK_HOME_PYTHON_SCRIPT=%~dp0find_spark_home.py + +rem Default to standard python interpreter unless told otherwise +set PYTHON_RUNNER=python +rem If PYSPARK_DRIVER_PYTHON is set, it overwrites the python version +if not "x%PYSPARK_DRIVER_PYTHON%"=="x" ( + set PYTHON_RUNNER=%PYSPARK_DRIVER_PYTHON% +) +rem If PYSPARK_PYTHON is set, it overwrites the python version +if not "x%PYSPARK_PYTHON%"=="x" ( + set PYTHON_RUNNER=%PYSPARK_PYTHON% +) + +rem If there is python installed, trying to use the root dir as SPARK_HOME +where %PYTHON_RUNNER% +if %ERRORLEVEL% neq 0 ( + echo %PYTHON_RUNNER% wasn't found; Python doesn't seem to be installed +if "x%SPARK_HOME%"=="x" ( + set SPARK_HOME=%~dp0.. +) +) --- End diff -- I think the problem here from the last commit is: - now `PYTHON_RUNNER` can't be an absolute path as `where` does not work with it ``` ERROR: Invalid pattern is specified in "path:pattern". C:\Python27\python.exe wasn't found; Python doesn't seem to be installed ``` - It print out the output from `where`: ``` C:\...>pyspark C:\cygwin\bin\python C:\Python27\python.exe ... ``` - and the error message looks not more useful than the previous one: ``` python wasn't found; Python doesn't seem to be installed ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19468#discussion_r152449333 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model._ + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.ConfigurationUtils +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.util.Utils + +/** + * A factory class for configuring and creating executor pods. + */ +private[spark] trait ExecutorPodFactory { + + /** + * Configure and construct an executor pod with the given parameters. + */ + def createExecutorPod( + executorId: String, + applicationId: String, + driverUrl: String, + executorEnvs: Seq[(String, String)], + driverPod: Pod, + nodeToLocalTaskCount: Map[String, Int]): Pod +} + +private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf) + extends ExecutorPodFactory { + + import ExecutorPodFactoryImpl._ + + private val executorExtraClasspath = +sparkConf.get(org.apache.spark.internal.config.EXECUTOR_CLASS_PATH) + + private val executorLabels = ConfigurationUtils.parsePrefixedKeyValuePairs( +sparkConf, +KUBERNETES_EXECUTOR_LABEL_PREFIX, +"executor label") + require( +!executorLabels.contains(SPARK_APP_ID_LABEL), +s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is reserved for Spark.") + require( +!executorLabels.contains(SPARK_EXECUTOR_ID_LABEL), +s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it is reserved for" + + " Spark.") + require( +!executorLabels.contains(SPARK_ROLE_LABEL), +s"Custom executor labels cannot contain $SPARK_ROLE_LABEL as it is reserved for Spark.") + + private val executorAnnotations = +ConfigurationUtils.parsePrefixedKeyValuePairs( + sparkConf, + KUBERNETES_EXECUTOR_ANNOTATION_PREFIX, + "executor annotation") + private val nodeSelector = +ConfigurationUtils.parsePrefixedKeyValuePairs( + sparkConf, + KUBERNETES_NODE_SELECTOR_PREFIX, + "node selector") + + private val executorDockerImage = sparkConf.get(EXECUTOR_DOCKER_IMAGE) + private val dockerImagePullPolicy = sparkConf.get(DOCKER_IMAGE_PULL_POLICY) + private val executorPort = sparkConf.getInt("spark.executor.port", DEFAULT_STATIC_PORT) + private val blockManagerPort = sparkConf +.getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT) + + private val executorPodNamePrefix = sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX) + + private val executorMemoryMiB = sparkConf.get(org.apache.spark.internal.config.EXECUTOR_MEMORY) + private val executorMemoryString = sparkConf.get( +org.apache.spark.internal.config.EXECUTOR_MEMORY.key, +org.apache.spark.internal.config.EXECUTOR_MEMORY.defaultValueString) --- End diff -- `executorMemoryMiB` is an int value representing the numeber of megabytes. I think we should translate it to whatever k8s accepts as byte string, to keep the same semantic. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19767: [SPARK-22543][SQL] fix java 64kb compile error fo...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/19767#discussion_r152449180 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -105,6 +105,41 @@ abstract class Expression extends TreeNode[Expression] { val isNull = ctx.freshName("isNull") val value = ctx.freshName("value") val ve = doGenCode(ctx, ExprCode("", isNull, value)) + + // TODO: support whole stage codegen too + if (ve.code.trim.length > 1024 && ctx.INPUT_ROW != null && ctx.currentVars == null) { +val setIsNull = if (ve.isNull != "false" && ve.isNull != "true") { + val globalIsNull = ctx.freshName("globalIsNull") + ctx.addMutableState("boolean", globalIsNull, s"$globalIsNull = false;") + val localIsNull = ve.isNull + ve.isNull = globalIsNull + s"$globalIsNull = $localIsNull;" +} else { + "" +} + +val setValue = { + val globalValue = ctx.freshName("globalValue") + ctx.addMutableState( +ctx.javaType(dataType), globalValue, s"$globalValue = ${ctx.defaultValue(dataType)};") + val localValue = ve.value + ve.value = globalValue + s"$globalValue = $localValue;" +} + +val funcName = ctx.freshName(nodeName) +val funcFullName = ctx.addNewFunction(funcName, + s""" + |private void $funcName(InternalRow ${ctx.INPUT_ROW}) { + | ${ve.code.trim} + | $setValue --- End diff -- I originally thought we could avoid the overhead by using thread-local singleton? But, it's a bit weird, so the current code looks good. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19776: [SPARK-22548][SQL] Incorrect nested AND expressio...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19776 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19779: [SPARK-17920][SPARK-19580][SPARK-19878][SQL] Supp...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19779#discussion_r152448746 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala --- @@ -841,6 +841,75 @@ class VersionsSuite extends SparkFunSuite with Logging { } } +test(s"$version: SPARK-17920: Insert into/overwrite external avro table") { + withTempDir { dir => +val path = dir.getAbsolutePath +val schemaPath = s"""$path${File.separator}avroschemadir""" + +new File(schemaPath).mkdir() +val avroSchema = + """{ +| "name": "test_record", +| "type": "record", +| "fields": [ { +|"name": "f0", +|"type": [ +| "null", +| { +|"precision": 38, +|"scale": 2, +|"type": "bytes", +|"logicalType": "decimal" +| } +|] +| } ] +|} + """.stripMargin +val schemaurl = s"""$schemaPath${File.separator}avroDecimal.avsc""" +new java.io.PrintWriter(schemaurl) { write(avroSchema); close() } +val url = Thread.currentThread().getContextClassLoader.getResource("avroDecimal") +val srcLocation = new File(url.getFile) +val destTableName = "tab1" +val srcTableName = "tab2" + +withTable(srcTableName, destTableName) { + versionSpark.sql( +s""" + |CREATE TABLE $srcTableName + |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' + |WITH SERDEPROPERTIES ('respectSparkSchema' = 'true') + |STORED AS + | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' + | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' + |LOCATION '$srcLocation' + |TBLPROPERTIES ('avro.schema.url' = '$schemaurl') + """.stripMargin + ) + val destLocation = s"""$path${File.separator}destTableLocation""" + new File(destLocation).mkdir() + + versionSpark.sql( +s""" + |CREATE TABLE $destTableName + |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' + |WITH SERDEPROPERTIES ('respectSparkSchema' = 'true') + |STORED AS + | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' + | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' + |LOCATION '$destLocation' --- End diff -- we can just test the managed table, to avoid creating a temp directory for external table. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19776: [SPARK-22548][SQL] Incorrect nested AND expression pushe...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19776 Thanks! Merged to master/2.2/2.1 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19468#discussion_r152448480 --- Diff: resource-managers/kubernetes/core/src/test/resources/log4j.properties --- @@ -0,0 +1,31 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the file target/unit-tests.log +log4j.rootCategory=INFO, file +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.append=true +log4j.appender.file.file=target/unit-tests.log +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n + +# Ignore messages below warning level from a few verbose libraries. +log4j.logger.com.sun.jersey=WARN +log4j.logger.org.apache.hadoop=WARN +log4j.logger.org.eclipse.jetty=WARN +log4j.logger.org.mortbay=WARN +log4j.logger.org.spark_project.jetty=WARN --- End diff -- shall we also put these in `config/log4j.properties.template`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19776: [SPARK-22548][SQL] Incorrect nested AND expression pushe...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19776 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19781: [SPARK-22445][SQL][FOLLOW-UP] Respect children's needCop...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/19781 I finally found the failure case by a simple query. I'll update soon. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19518: [SPARK-18016][SQL][CATALYST] Code Generation: Constant P...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/19518 yea, ok @kiszk I'll review your work. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19601: [SPARK-22383][SQL] Generate code to directly get value o...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/19601 @cloud-fan could you please review this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19776: [SPARK-22548][SQL] Incorrect nested AND expression pushe...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19776 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19776: [SPARK-22548][SQL] Incorrect nested AND expression pushe...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19776 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84086/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19776: [SPARK-22548][SQL] Incorrect nested AND expression pushe...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19776 **[Test build #84086 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84086/testReport)** for PR 19776 at commit [`a0b3d4e`](https://github.com/apache/spark/commit/a0b3d4e990cd7024b532593bca321499001fc89b). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19518: [SPARK-18016][SQL][CATALYST] Code Generation: Constant P...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/19518 @cloud-fan I want to take this over if possible cc @maropu --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19779: [SPARK-17920][SPARK-19580][SPARK-19878][SQL] Supp...
Github user vinodkc commented on a diff in the pull request: https://github.com/apache/spark/pull/19779#discussion_r152446382 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala --- @@ -841,6 +841,75 @@ class VersionsSuite extends SparkFunSuite with Logging { } } +test(s"$version: SPARK-17920: Insert into/overwrite external avro table") { + withTempDir { dir => +val path = dir.getAbsolutePath +val schemaPath = s"""$path${File.separator}avroschemadir""" + +new File(schemaPath).mkdir() +val avroSchema = + """{ +| "name": "test_record", +| "type": "record", +| "fields": [ { +|"name": "f0", +|"type": [ +| "null", +| { +|"precision": 38, +|"scale": 2, +|"type": "bytes", +|"logicalType": "decimal" +| } +|] +| } ] +|} + """.stripMargin +val schemaurl = s"""$schemaPath${File.separator}avroDecimal.avsc""" +new java.io.PrintWriter(schemaurl) { write(avroSchema); close() } +val url = Thread.currentThread().getContextClassLoader.getResource("avroDecimal") +val srcLocation = new File(url.getFile) +val destTableName = "tab1" +val srcTableName = "tab2" + +withTable(srcTableName, destTableName) { + versionSpark.sql( +s""" + |CREATE TABLE $srcTableName + |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' + |WITH SERDEPROPERTIES ('respectSparkSchema' = 'true') + |STORED AS + | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' + | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' + |LOCATION '$srcLocation' + |TBLPROPERTIES ('avro.schema.url' = '$schemaurl') + """.stripMargin + ) + val destLocation = s"""$path${File.separator}destTableLocation""" + new File(destLocation).mkdir() + + versionSpark.sql( +s""" + |CREATE TABLE $destTableName + |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' + |WITH SERDEPROPERTIES ('respectSparkSchema' = 'true') + |STORED AS + | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' + | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' + |LOCATION '$destLocation' --- End diff -- @cloud-fan , This bug is for both external and managed tables. I've added a new test case for managed table too. However, to avoid code duplication, should I include both test inside same test method?. Please suggest. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19779: [SPARK-17920][SPARK-19580][SPARK-19878][SQL] Support wri...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19779 **[Test build #84090 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84090/testReport)** for PR 19779 at commit [`083e1b3`](https://github.com/apache/spark/commit/083e1b39d733e1b3a1c47a7a87c74b9dfa8444e7). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/19468#discussion_r152446177 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model._ + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.ConfigurationUtils +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.util.Utils + +/** + * A factory class for configuring and creating executor pods. + */ +private[spark] trait ExecutorPodFactory { + + /** + * Configure and construct an executor pod with the given parameters. + */ + def createExecutorPod( + executorId: String, + applicationId: String, + driverUrl: String, + executorEnvs: Seq[(String, String)], + driverPod: Pod, + nodeToLocalTaskCount: Map[String, Int]): Pod +} + +private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf) + extends ExecutorPodFactory { + + import ExecutorPodFactoryImpl._ + + private val executorExtraClasspath = +sparkConf.get(org.apache.spark.internal.config.EXECUTOR_CLASS_PATH) + + private val executorLabels = ConfigurationUtils.parsePrefixedKeyValuePairs( +sparkConf, +KUBERNETES_EXECUTOR_LABEL_PREFIX, +"executor label") + require( +!executorLabels.contains(SPARK_APP_ID_LABEL), +s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is reserved for Spark.") + require( +!executorLabels.contains(SPARK_EXECUTOR_ID_LABEL), +s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it is reserved for" + + " Spark.") + require( +!executorLabels.contains(SPARK_ROLE_LABEL), +s"Custom executor labels cannot contain $SPARK_ROLE_LABEL as it is reserved for Spark.") + + private val executorAnnotations = +ConfigurationUtils.parsePrefixedKeyValuePairs( + sparkConf, + KUBERNETES_EXECUTOR_ANNOTATION_PREFIX, + "executor annotation") + private val nodeSelector = +ConfigurationUtils.parsePrefixedKeyValuePairs( + sparkConf, + KUBERNETES_NODE_SELECTOR_PREFIX, + "node selector") + + private val executorDockerImage = sparkConf.get(EXECUTOR_DOCKER_IMAGE) + private val dockerImagePullPolicy = sparkConf.get(DOCKER_IMAGE_PULL_POLICY) + private val executorPort = sparkConf.getInt("spark.executor.port", DEFAULT_STATIC_PORT) + private val blockManagerPort = sparkConf +.getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT) + + private val executorPodNamePrefix = sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX) + + private val executorMemoryMiB = sparkConf.get(org.apache.spark.internal.config.EXECUTOR_MEMORY) + private val executorMemoryString = sparkConf.get( +org.apache.spark.internal.config.EXECUTOR_MEMORY.key, +org.apache.spark.internal.config.EXECUTOR_MEMORY.defaultValueString) --- End diff -- Just for reference: > Limits and requests for memory are measured in bytes. You can express memory as a plain integer or as a fixed-point integer using one of these suffixes: E, P, T, G, M, k. You can also use the power-of-two equivalents: Ei, Pi, Ti, Gi, Mi, Ki. https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#meaning-of-memory --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/19468#discussion_r152445984 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model._ + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.ConfigurationUtils +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.util.Utils + +/** + * A factory class for configuring and creating executor pods. + */ +private[spark] trait ExecutorPodFactory { + + /** + * Configure and construct an executor pod with the given parameters. + */ + def createExecutorPod( + executorId: String, + applicationId: String, + driverUrl: String, + executorEnvs: Seq[(String, String)], + driverPod: Pod, + nodeToLocalTaskCount: Map[String, Int]): Pod +} + +private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf) + extends ExecutorPodFactory { + + import ExecutorPodFactoryImpl._ + + private val executorExtraClasspath = +sparkConf.get(org.apache.spark.internal.config.EXECUTOR_CLASS_PATH) + + private val executorLabels = ConfigurationUtils.parsePrefixedKeyValuePairs( +sparkConf, +KUBERNETES_EXECUTOR_LABEL_PREFIX, +"executor label") + require( +!executorLabels.contains(SPARK_APP_ID_LABEL), +s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is reserved for Spark.") + require( +!executorLabels.contains(SPARK_EXECUTOR_ID_LABEL), +s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it is reserved for" + + " Spark.") + require( +!executorLabels.contains(SPARK_ROLE_LABEL), +s"Custom executor labels cannot contain $SPARK_ROLE_LABEL as it is reserved for Spark.") + + private val executorAnnotations = +ConfigurationUtils.parsePrefixedKeyValuePairs( + sparkConf, + KUBERNETES_EXECUTOR_ANNOTATION_PREFIX, + "executor annotation") + private val nodeSelector = +ConfigurationUtils.parsePrefixedKeyValuePairs( + sparkConf, + KUBERNETES_NODE_SELECTOR_PREFIX, + "node selector") + + private val executorDockerImage = sparkConf.get(EXECUTOR_DOCKER_IMAGE) + private val dockerImagePullPolicy = sparkConf.get(DOCKER_IMAGE_PULL_POLICY) + private val executorPort = sparkConf.getInt("spark.executor.port", DEFAULT_STATIC_PORT) + private val blockManagerPort = sparkConf +.getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT) + + private val executorPodNamePrefix = sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX) + + private val executorMemoryMiB = sparkConf.get(org.apache.spark.internal.config.EXECUTOR_MEMORY) + private val executorMemoryString = sparkConf.get( +org.apache.spark.internal.config.EXECUTOR_MEMORY.key, +org.apache.spark.internal.config.EXECUTOR_MEMORY.defaultValueString) --- End diff -- We bias towards keeping Spark configuration consistent across resource managers and then translating it to what Kubernetes expects. Should we instead be accepting Kubernetes-formatted memory strings and translating it on the environment variable? Regardless, there is going to be a difference in the string we send to the JVM in `-Xmx` and `-Xms` and what we pass to Kubernetes. We could translate everything to just raw bytes to keep everything consistent. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19468#discussion_r152445535 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model._ + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.ConfigurationUtils +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.util.Utils + +/** + * A factory class for configuring and creating executor pods. + */ +private[spark] trait ExecutorPodFactory { + + /** + * Configure and construct an executor pod with the given parameters. + */ + def createExecutorPod( + executorId: String, + applicationId: String, + driverUrl: String, + executorEnvs: Seq[(String, String)], + driverPod: Pod, + nodeToLocalTaskCount: Map[String, Int]): Pod +} + +private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf) + extends ExecutorPodFactory { + + import ExecutorPodFactoryImpl._ + + private val executorExtraClasspath = +sparkConf.get(org.apache.spark.internal.config.EXECUTOR_CLASS_PATH) + + private val executorLabels = ConfigurationUtils.parsePrefixedKeyValuePairs( +sparkConf, +KUBERNETES_EXECUTOR_LABEL_PREFIX, +"executor label") + require( +!executorLabels.contains(SPARK_APP_ID_LABEL), +s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is reserved for Spark.") + require( +!executorLabels.contains(SPARK_EXECUTOR_ID_LABEL), +s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it is reserved for" + + " Spark.") + require( +!executorLabels.contains(SPARK_ROLE_LABEL), +s"Custom executor labels cannot contain $SPARK_ROLE_LABEL as it is reserved for Spark.") + + private val executorAnnotations = +ConfigurationUtils.parsePrefixedKeyValuePairs( + sparkConf, + KUBERNETES_EXECUTOR_ANNOTATION_PREFIX, + "executor annotation") + private val nodeSelector = +ConfigurationUtils.parsePrefixedKeyValuePairs( + sparkConf, + KUBERNETES_NODE_SELECTOR_PREFIX, + "node selector") + + private val executorDockerImage = sparkConf.get(EXECUTOR_DOCKER_IMAGE) + private val dockerImagePullPolicy = sparkConf.get(DOCKER_IMAGE_PULL_POLICY) + private val executorPort = sparkConf.getInt("spark.executor.port", DEFAULT_STATIC_PORT) + private val blockManagerPort = sparkConf +.getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT) + + private val executorPodNamePrefix = sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX) + + private val executorMemoryMiB = sparkConf.get(org.apache.spark.internal.config.EXECUTOR_MEMORY) + private val executorMemoryString = sparkConf.get( +org.apache.spark.internal.config.EXECUTOR_MEMORY.key, +org.apache.spark.internal.config.EXECUTOR_MEMORY.defaultValueString) --- End diff -- IIUC, you are trying to bypass the Spark's byte string parsing and pass the byte string to k8s directly. This may be confusing as `EXECUTOR_MEMORY` is an existing Spark config. E.g. when users specify `100mb` for this config, they expect to see executors having 100mb memory whatever the resource manager is. If k8s parses `100mb` differently, the behavior may be unexpected. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19468#discussion_r152444685 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model._ + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.ConfigurationUtils +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.util.Utils + +/** + * A factory class for configuring and creating executor pods. + */ +private[spark] trait ExecutorPodFactory { + + /** + * Configure and construct an executor pod with the given parameters. + */ + def createExecutorPod( + executorId: String, + applicationId: String, + driverUrl: String, + executorEnvs: Seq[(String, String)], + driverPod: Pod, + nodeToLocalTaskCount: Map[String, Int]): Pod +} + +private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf) + extends ExecutorPodFactory { + + import ExecutorPodFactoryImpl._ + + private val executorExtraClasspath = +sparkConf.get(org.apache.spark.internal.config.EXECUTOR_CLASS_PATH) + + private val executorLabels = ConfigurationUtils.parsePrefixedKeyValuePairs( +sparkConf, +KUBERNETES_EXECUTOR_LABEL_PREFIX, +"executor label") + require( +!executorLabels.contains(SPARK_APP_ID_LABEL), +s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is reserved for Spark.") + require( +!executorLabels.contains(SPARK_EXECUTOR_ID_LABEL), +s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it is reserved for" + + " Spark.") + require( +!executorLabels.contains(SPARK_ROLE_LABEL), +s"Custom executor labels cannot contain $SPARK_ROLE_LABEL as it is reserved for Spark.") + + private val executorAnnotations = +ConfigurationUtils.parsePrefixedKeyValuePairs( + sparkConf, + KUBERNETES_EXECUTOR_ANNOTATION_PREFIX, + "executor annotation") + private val nodeSelector = +ConfigurationUtils.parsePrefixedKeyValuePairs( + sparkConf, + KUBERNETES_NODE_SELECTOR_PREFIX, + "node selector") + + private val executorDockerImage = sparkConf.get(EXECUTOR_DOCKER_IMAGE) + private val dockerImagePullPolicy = sparkConf.get(DOCKER_IMAGE_PULL_POLICY) + private val executorPort = sparkConf.getInt("spark.executor.port", DEFAULT_STATIC_PORT) + private val blockManagerPort = sparkConf +.getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT) + + private val executorPodNamePrefix = sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX) + + private val executorMemoryMiB = sparkConf.get(org.apache.spark.internal.config.EXECUTOR_MEMORY) --- End diff -- `executorMemoryMiB` is the number of MB. I'm not sure what k8s expects, but passing `executorMemoryMiB + "MB"` should be pretty safe? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19468#discussion_r152443965 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/ConfigurationUtils.scala --- @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.k8s + +import org.apache.spark.SparkConf + +private[spark] object ConfigurationUtils { + + /** + * Extract and parse Spark configuration properties with a given name prefix and + * return the result as a Map. Keys must not have more than one value. + * + * @param sparkConf Spark configuration + * @param prefix the given property name prefix + * @param configType a descriptive note on the type of entities of interest + * @return a Map storing the configuration property keys and values + */ + def parsePrefixedKeyValuePairs( + sparkConf: SparkConf, + prefix: String, + configType: String): Map[String, String] = { --- End diff -- where do we use this parameter? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19767: [SPARK-22543][SQL] fix java 64kb compile error for deepl...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19767 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19767: [SPARK-22543][SQL] fix java 64kb compile error for deepl...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19767 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84085/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19767: [SPARK-22543][SQL] fix java 64kb compile error for deepl...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19767 **[Test build #84085 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84085/testReport)** for PR 19767 at commit [`d126977`](https://github.com/apache/spark/commit/d126977bbdd221287b0825fa78c04b1065d97ab1). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/19468#discussion_r152443488 --- Diff: pom.xml --- @@ -2648,6 +2648,13 @@ + + kubernetes + +resource-managers/kubernetes/core --- End diff -- See https://github.com/apache/spark/pull/19468#discussion_r145250461 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org