[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r79114194 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala --- @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import scala.collection.mutable + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, BasicColStats, Statistics} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.types._ + + +/** + * Analyzes the given columns of the given table in the current database to generate statistics, + * which will be used in query optimizations. + */ +case class AnalyzeColumnCommand( +tableIdent: TableIdentifier, +columnNames: Seq[String]) extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { +val sessionState = sparkSession.sessionState +val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdent)) + +// check correctness of column names +val validColumns = mutable.MutableList[NamedExpression]() +val resolver = sessionState.conf.resolver +columnNames.foreach { col => + val exprOption = relation.resolve(col.split("\\."), resolver) + if (exprOption.isEmpty) { +throw new AnalysisException(s"Invalid column name: $col") + } + if (validColumns.map(_.exprId).contains(exprOption.get.exprId)) { +throw new AnalysisException(s"Duplicate column name: $col") + } + validColumns += exprOption.get +} + +relation match { + case catalogRel: CatalogRelation => +updateStats(catalogRel.catalogTable, + AnalyzeTableCommand.calculateTotalSize(sparkSession, catalogRel.catalogTable)) + + case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined => +updateStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes) + + case otherRelation => +throw new AnalysisException("ANALYZE TABLE is not supported for " + + s"${otherRelation.nodeName}.") +} + +def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit = { + // Collect statistics per column. + // The first element in the result will be the overall row count, the following elements + // will be structs containing all column stats. + // The layout of each struct follows the layout of the BasicColStats. + val ndvMaxErr = sessionState.conf.ndvMaxError + val expressions = Count(Literal(1)).toAggregateExpression() +: +validColumns.map(ColumnStatsStruct(_, ndvMaxErr)) + val namedExpressions = expressions.map(e => Alias(e, e.toString)()) + val statsRow = Dataset.ofRows(sparkSession, Aggregate(Nil, namedExpressions, relation)) +.queryExecution.toRdd.collect().head + + // unwrap the result + val rowCount = statsRow.getLong(0) + val colStats = validColumns.zipWithIndex.map { case (expr, i) => +val colInfo = statsRow.getStruct(i + 1, ColumnStatsStruct.statsNumber) +val colStats = ColumnStatsStruct.unwrapRow(expr, colInfo) +(expr.name, colStats) + }.toMap + + val statistics = +Statistics(sizeInBytes = newTotalSize, rowCount = Some(rowCount), basicColStats = colStats) --- End diff -- One question. Seems we overwrite all existing column statistics, no
[GitHub] spark pull request #14971: [SPARK-17410] [SPARK-17284] Move Hive-generated S...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14971#discussion_r79113972 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala --- @@ -111,23 +111,8 @@ private[hive] case class MetastoreRelation( @transient override lazy val statistics: Statistics = { catalogTable.stats.getOrElse(Statistics( sizeInBytes = { -val totalSize = hiveQlTable.getParameters.get(StatsSetupConst.TOTAL_SIZE) -val rawDataSize = hiveQlTable.getParameters.get(StatsSetupConst.RAW_DATA_SIZE) --- End diff -- In the master branch, we do not use Hive-generated `numRows`... Let me fix it in this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #11119: [SPARK-10780][ML] Add an initial model to kmeans
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/9 **[Test build #65478 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/65478/consoleFull)** for PR 9 at commit [`eb7fbbe`](https://github.com/apache/spark/commit/eb7fbbea3a68135442c5088ccc6972b6c50b8f51). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14971: [SPARK-17410] [SPARK-17284] Move Hive-generated Stats In...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/14971 Let me write a test case to ensure this correctly works and also put more comments in the code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15090: [SPARK-17073] [SQL] generate column-level statistics
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15090 **[Test build #65477 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/65477/consoleFull)** for PR 15090 at commit [`9cdc722`](https://github.com/apache/spark/commit/9cdc722780445c62ed2c306ede0ebc4829dbdbe8). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r79113407 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala --- @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import scala.collection.mutable + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable} +import org.apache.spark.sql.catalyst.plans.logical.{BasicColStats, Statistics} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types._ + + +/** + * Analyzes the given columns of the given table in the current database to generate statistics, + * which will be used in query optimizations. + */ +case class AnalyzeColumnCommand( +tableName: String, +columnNames: Seq[String]) extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { +val sessionState = sparkSession.sessionState +val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName) +val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdent)) + +// check correctness for column names +val attributeNames = relation.output.map(_.name.toLowerCase) +val invalidColumns = columnNames.filterNot { col => attributeNames.contains(col.toLowerCase)} +if (invalidColumns.nonEmpty) { + throw new AnalysisException(s"Invalid columns for table $tableName: $invalidColumns.") +} + +relation match { + case catalogRel: CatalogRelation => +updateStats(catalogRel.catalogTable, + AnalyzeTableCommand.calculateTotalSize(sparkSession, catalogRel.catalogTable)) + + case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined => +updateStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes) + + case otherRelation => +throw new AnalysisException(s"ANALYZE TABLE is not supported for " + + s"${otherRelation.nodeName}.") +} + +def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit = { + val lowerCaseNames = columnNames.map(_.toLowerCase) + val attributes = +relation.output.filter(attr => lowerCaseNames.contains(attr.name.toLowerCase)) + + // collect column statistics + val aggColumns = mutable.ArrayBuffer[Column](count(Column("*"))) + attributes.foreach(entry => aggColumns ++= statsAgg(entry.name, entry.dataType)) + val statsRow: InternalRow = Dataset.ofRows(sparkSession, relation).select(aggColumns: _*) +.queryExecution.toRdd.collect().head + + // We also update table-level stats to prevent inconsistency in case of table modification + // between the two ANALYZE commands for collecting table-level stats and column-level stats. + val rowCount = statsRow.getLong(0) + var newStats: Statistics = if (catalogTable.stats.isDefined) { +catalogTable.stats.get.copy(sizeInBytes = newTotalSize, rowCount = Some(rowCount)) + } else { +Statistics(sizeInBytes = newTotalSize, rowCount = Some(rowCount)) + } + + var pos = 1 + val colStats = mutable.HashMap[String, BasicColStats]() + attributes.foreach { attr => +attr.dataType match { + case n: NumericType => +colStats += attr.name -> BasicColStats( + dataType = attr.dataType, + numNulls = rowCount - statsRow.getLong(pos + NumericStatsAgg.numNotNullsIndex), + max = Option(statsRow.get(pos + NumericStatsAgg.maxIndex, attr.dat
[GitHub] spark issue #14971: [SPARK-17410] [SPARK-17284] Move Hive-generated Stats In...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/14971 It does not break the existing behavior. If the MetastoreRelation has the Hive-generated table statistics, we create a statistics [here](https://github.com/gatorsmile/spark/blob/9e18ba104527d2bb14331f4b51194002dabb2556/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala#L405-L422). If we have Spark-generated statistics, we overwrite the hive-generated one in [restoreTableMetadata](https://github.com/apache/spark/blob/d6eede9a36766e2d2294951b054d7557008a5662/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala#L482-L484). Thus, the current code completely matches what you wants. : ) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r79113294 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala --- @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import scala.collection.mutable + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable} +import org.apache.spark.sql.catalyst.plans.logical.{BasicColStats, Statistics} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types._ + + +/** + * Analyzes the given columns of the given table in the current database to generate statistics, + * which will be used in query optimizations. + */ +case class AnalyzeColumnCommand( +tableName: String, +columnNames: Seq[String]) extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { +val sessionState = sparkSession.sessionState +val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName) +val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdent)) + +// check correctness for column names +val attributeNames = relation.output.map(_.name.toLowerCase) +val invalidColumns = columnNames.filterNot { col => attributeNames.contains(col.toLowerCase)} +if (invalidColumns.nonEmpty) { + throw new AnalysisException(s"Invalid columns for table $tableName: $invalidColumns.") +} + +relation match { + case catalogRel: CatalogRelation => +updateStats(catalogRel.catalogTable, + AnalyzeTableCommand.calculateTotalSize(sparkSession, catalogRel.catalogTable)) + + case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined => +updateStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes) + + case otherRelation => +throw new AnalysisException(s"ANALYZE TABLE is not supported for " + + s"${otherRelation.nodeName}.") +} + +def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit = { + val lowerCaseNames = columnNames.map(_.toLowerCase) + val attributes = +relation.output.filter(attr => lowerCaseNames.contains(attr.name.toLowerCase)) + + // collect column statistics + val aggColumns = mutable.ArrayBuffer[Column](count(Column("*"))) + attributes.foreach(entry => aggColumns ++= statsAgg(entry.name, entry.dataType)) + val statsRow: InternalRow = Dataset.ofRows(sparkSession, relation).select(aggColumns: _*) +.queryExecution.toRdd.collect().head + + // We also update table-level stats to prevent inconsistency in case of table modification + // between the two ANALYZE commands for collecting table-level stats and column-level stats. + val rowCount = statsRow.getLong(0) + var newStats: Statistics = if (catalogTable.stats.isDefined) { +catalogTable.stats.get.copy(sizeInBytes = newTotalSize, rowCount = Some(rowCount)) + } else { +Statistics(sizeInBytes = newTotalSize, rowCount = Some(rowCount)) + } + + var pos = 1 + val colStats = mutable.HashMap[String, BasicColStats]() + attributes.foreach { attr => +attr.dataType match { + case n: NumericType => +colStats += attr.name -> BasicColStats( + dataType = attr.dataType, + numNulls = rowCount - statsRow.getLong(pos + NumericStatsAgg.numNotNullsIndex), + max = Option(statsRow.get(pos + NumericStatsAgg.maxIndex, attr.dat
[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/15090#discussion_r79112828 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala --- @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import scala.collection.mutable + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable} +import org.apache.spark.sql.catalyst.plans.logical.{BasicColStats, Statistics} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types._ + + +/** + * Analyzes the given columns of the given table in the current database to generate statistics, + * which will be used in query optimizations. + */ +case class AnalyzeColumnCommand( +tableName: String, +columnNames: Seq[String]) extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { +val sessionState = sparkSession.sessionState +val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName) +val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdent)) + +// check correctness for column names +val attributeNames = relation.output.map(_.name.toLowerCase) +val invalidColumns = columnNames.filterNot { col => attributeNames.contains(col.toLowerCase)} +if (invalidColumns.nonEmpty) { + throw new AnalysisException(s"Invalid columns for table $tableName: $invalidColumns.") +} + +relation match { + case catalogRel: CatalogRelation => +updateStats(catalogRel.catalogTable, + AnalyzeTableCommand.calculateTotalSize(sparkSession, catalogRel.catalogTable)) + + case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined => +updateStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes) + + case otherRelation => +throw new AnalysisException(s"ANALYZE TABLE is not supported for " + + s"${otherRelation.nodeName}.") +} + +def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit = { + val lowerCaseNames = columnNames.map(_.toLowerCase) + val attributes = +relation.output.filter(attr => lowerCaseNames.contains(attr.name.toLowerCase)) + + // collect column statistics + val aggColumns = mutable.ArrayBuffer[Column](count(Column("*"))) + attributes.foreach(entry => aggColumns ++= statsAgg(entry.name, entry.dataType)) + val statsRow: InternalRow = Dataset.ofRows(sparkSession, relation).select(aggColumns: _*) +.queryExecution.toRdd.collect().head + + // We also update table-level stats to prevent inconsistency in case of table modification + // between the two ANALYZE commands for collecting table-level stats and column-level stats. + val rowCount = statsRow.getLong(0) + var newStats: Statistics = if (catalogTable.stats.isDefined) { +catalogTable.stats.get.copy(sizeInBytes = newTotalSize, rowCount = Some(rowCount)) + } else { +Statistics(sizeInBytes = newTotalSize, rowCount = Some(rowCount)) + } + + var pos = 1 + val colStats = mutable.HashMap[String, BasicColStats]() + attributes.foreach { attr => +attr.dataType match { + case n: NumericType => +colStats += attr.name -> BasicColStats( + dataType = attr.dataType, + numNulls = rowCount - statsRow.getLong(pos + NumericStatsAgg.numNotNullsIndex), + max = Option(statsRow.get(pos + NumericStatsAgg.maxIndex, att
[GitHub] spark issue #15090: [SPARK-17073] [SQL] generate column-level statistics
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15090 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/65475/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15090: [SPARK-17073] [SQL] generate column-level statistics
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15090 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15090: [SPARK-17073] [SQL] generate column-level statistics
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15090 **[Test build #65475 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/65475/consoleFull)** for PR 15090 at commit [`761a9e0`](https://github.com/apache/spark/commit/761a9e0641ac6e6f4798d4be988ded0875b974a0). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `case class AnalyzeTableCommand(` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15116: [SPARK-17559][MLLIB]persist edges if their storage level...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15116 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15116: [SPARK-17559][MLLIB]persist edges if their storage level...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15116 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/65474/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15116: [SPARK-17559][MLLIB]persist edges if their storage level...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15116 **[Test build #65474 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/65474/consoleFull)** for PR 15116 at commit [`ad29af4`](https://github.com/apache/spark/commit/ad29af46b34d2d156078aba48b8e0427136fc6dd). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15073: [SPARK-17518] [SQL] Block Users to Specify the Internal ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15073 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15073: [SPARK-17518] [SQL] Block Users to Specify the Internal ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15073 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/65472/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15073: [SPARK-17518] [SQL] Block Users to Specify the Internal ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15073 **[Test build #65472 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/65472/consoleFull)** for PR 15073 at commit [`44f335b`](https://github.com/apache/spark/commit/44f335bb6d8bd17a29fd516b3a1b79aed9d1452d). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14467: [SPARK-16861][PYSPARK][CORE] Refactor PySpark acc...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/14467#discussion_r79108466 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala --- @@ -866,11 +866,14 @@ class BytesToString extends org.apache.spark.api.java.function.Function[Array[By } /** - * Internal class that acts as an `AccumulatorParam` for Python accumulators. Inside, it + * Internal class that acts as an `AccumulatorV2` for Python accumulators. Inside, it * collects a list of pickled strings that we pass to Python through a socket. */ -private class PythonAccumulatorParam(@transient private val serverHost: String, serverPort: Int) - extends AccumulatorParam[JList[Array[Byte]]] { +private[spark] class PythonAccumulatorV2(@transient private val serverHost: String, serverPort: Int) + extends AccumulatorV2[JList[Array[Byte]], JList[Array[Byte]]] { --- End diff -- ping @srowen --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15064: [SPARK-17509][SQL]When wrapping catalyst datatype to Hiv...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/15064 I have a dumb question. I saw your PR description shows the CPU time measurement for the wrapping functions. Could you share it with the community? Finding the critical paths is very important for further performance improvement. We might need to regularly monitor the top n to avoid potential performance regression. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14834: [SPARK-17163][ML] Unified LogisticRegression interface
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/14834 **[Test build #65476 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/65476/consoleFull)** for PR 14834 at commit [`38fad98`](https://github.com/apache/spark/commit/38fad988956458aac59109613c7d468855a0faf8). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15090: [SPARK-17073] [SQL] generate column-level statistics
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15090 **[Test build #65475 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/65475/consoleFull)** for PR 15090 at commit [`761a9e0`](https://github.com/apache/spark/commit/761a9e0641ac6e6f4798d4be988ded0875b974a0). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15100: [SPARK-17317][SparkR] Add SparkR vignette to bran...
Github user junyangq closed the pull request at: https://github.com/apache/spark/pull/15100 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15100: [SPARK-17317][SparkR] Add SparkR vignette to branch 2.0
Github user junyangq commented on the issue: https://github.com/apache/spark/pull/15100 Sure, thanks @shivaram --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15116: [SPARK-17559][MLLIB]persist edges if their storage level...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15116 **[Test build #65474 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/65474/consoleFull)** for PR 15116 at commit [`ad29af4`](https://github.com/apache/spark/commit/ad29af46b34d2d156078aba48b8e0427136fc6dd). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14762: [SPARK-16962][CORE][SQL] Fix misaligned record accesses ...
Github user sumansomasundar commented on the issue: https://github.com/apache/spark/pull/14762 made the changes @srowen --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15116: [SPARK-17559][MLLIB]persist edges if their storag...
GitHub user dding3 opened a pull request: https://github.com/apache/spark/pull/15116 [SPARK-17559][MLLIB]persist edges if their storage level is none in PeriodicGraphCheckpointer ## What changes were proposed in this pull request? When use PeriodicGraphCheckpointer to persist graph, sometimes the edges isn't persisted. As currently only when vertices's storage level is none, graph is persisted. However there is a chance vertices's storage level is not none while edges's is none. Eg. graph created by a outerJoinVertices operation, vertices is automatically cached while edges is not. In this way, edges will not be persisted if we use PeriodicGraphCheckpointer do persist. We need separately check edges's storage level and persisted it if it's none. ## How was this patch tested? manual tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/dding3/spark master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/15116.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #15116 commit ad29af46b34d2d156078aba48b8e0427136fc6dd Author: ding Date: 2016-09-15T21:39:10Z persist edges if their storage level is none --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15043: [SPARK-17491] Close serialization stream to fix w...
Github user srinathshankar commented on a diff in the pull request: https://github.com/apache/spark/pull/15043#discussion_r79105677 --- Diff: core/src/test/scala/org/apache/spark/storage/PartiallyUnrolledIteratorSuite.scala --- @@ -33,7 +33,7 @@ class PartiallyUnrolledIteratorSuite extends SparkFunSuite with MockitoSugar { val rest = (unrollSize until restSize + unrollSize).iterator val memoryStore = mock[MemoryStore] -val joinIterator = new PartiallyUnrolledIterator(memoryStore, unrollSize, unroll, rest) +val joinIterator = new PartiallyUnrolledIterator(memoryStore, ON_HEAP, unrollSize, unroll, rest) --- End diff -- We should look into trying to test this with OFF_HEAP as well. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15043: [SPARK-17491] Close serialization stream to fix w...
Github user srinathshankar commented on a diff in the pull request: https://github.com/apache/spark/pull/15043#discussion_r79105418 --- Diff: core/src/test/scala/org/apache/spark/storage/PartiallySerializedBlockSuite.scala --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import java.nio.ByteBuffer + +import scala.reflect.ClassTag + +import org.mockito.Mockito +import org.mockito.Mockito.atLeastOnce +import org.mockito.invocation.InvocationOnMock +import org.mockito.stubbing.Answer +import org.scalatest.{BeforeAndAfterEach, PrivateMethodTester} + +import org.apache.spark.{SparkConf, SparkFunSuite, TaskContext, TaskContextImpl} +import org.apache.spark.memory.MemoryMode +import org.apache.spark.serializer.{JavaSerializer, SerializationStream, SerializerManager} +import org.apache.spark.storage.memory.{MemoryStore, PartiallySerializedBlock, RedirectableOutputStream} +import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream} +import org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStream} + +class PartiallySerializedBlockSuite +extends SparkFunSuite +with BeforeAndAfterEach +with PrivateMethodTester { + + private val blockId = new TestBlockId("test") + private val conf = new SparkConf() + private val memoryStore = Mockito.mock(classOf[MemoryStore], Mockito.RETURNS_SMART_NULLS) + private val serializerManager = new SerializerManager(new JavaSerializer(conf), conf) + + private val getSerializationStream = PrivateMethod[SerializationStream]('serializationStream) + private val getRedirectableOutputStream = +PrivateMethod[RedirectableOutputStream]('redirectableOutputStream) + + override protected def beforeEach(): Unit = { +super.beforeEach() +Mockito.reset(memoryStore) + } + + private def partiallyUnroll[T: ClassTag]( + iter: Iterator[T], + numItemsToBuffer: Int): PartiallySerializedBlock[T] = { + +val bbos: ChunkedByteBufferOutputStream = { + val spy = Mockito.spy(new ChunkedByteBufferOutputStream(128, ByteBuffer.allocate)) + Mockito.doAnswer(new Answer[ChunkedByteBuffer] { +override def answer(invocationOnMock: InvocationOnMock): ChunkedByteBuffer = { + Mockito.spy(invocationOnMock.callRealMethod().asInstanceOf[ChunkedByteBuffer]) +} + }).when(spy).toChunkedByteBuffer + spy +} + +val serializer = serializerManager.getSerializer(implicitly[ClassTag[T]]).newInstance() +val redirectableOutputStream = Mockito.spy(new RedirectableOutputStream) +redirectableOutputStream.setOutputStream(bbos) +val serializationStream = Mockito.spy(serializer.serializeStream(redirectableOutputStream)) + +(1 to numItemsToBuffer).foreach { _ => + assert(iter.hasNext) + serializationStream.writeObject[T](iter.next()) +} + +val unrollMemory = bbos.size +new PartiallySerializedBlock[T]( + memoryStore, + serializerManager, + blockId, + serializationStream = serializationStream, + redirectableOutputStream, + unrollMemory = unrollMemory, + memoryMode = MemoryMode.ON_HEAP, + bbos, + rest = iter, + classTag = implicitly[ClassTag[T]]) + } + + test("valuesIterator() and finishWritingToStream() cannot be called after discard() is called") { +val partiallySerializedBlock = partiallyUnroll((1 to 10).iterator, 2) +partiallySerializedBlock.discard() +intercept[IllegalStateException] { + partiallySerializedBlock.finishWritingToStream(null) +} +intercept[IllegalStateException] { + partiallySerializedBlock.valuesIterator +} + } + + test("discard() can be called more than once") { +val partiallySerializedBlock = partiallyUnroll((1 to
[GitHub] spark issue #15047: [SPARK-17495] [SQL] Add Hash capability semantically equ...
Github user tejasapatil commented on the issue: https://github.com/apache/spark/pull/15047 @rxin : I could but the test case depends on few Hive classes for validation. I could either (keep the test case in sql/hive and move HiveHash to sql/catalyst) OR (move both to sql/catalyst and hard code expected output in the test case so that I need not have to depend on hive classes) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14990: [SPARK-17426][SQL] Refactor `TreeNode.toJSON` to avoid O...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/14990 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14990: [SPARK-17426][SQL] Refactor `TreeNode.toJSON` to avoid O...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/14990 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/65471/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14990: [SPARK-17426][SQL] Refactor `TreeNode.toJSON` to avoid O...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/14990 **[Test build #65471 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/65471/consoleFull)** for PR 14990 at commit [`d6838d0`](https://github.com/apache/spark/commit/d6838d0a5575caedfc8ffcf7fcead6eb3bc793d1). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #11403: [SPARK-13523] [SQL] Reuse exchanges in a query
Github user viirya commented on the issue: https://github.com/apache/spark/pull/11403 @davies I have a question about this. Maybe you have the answer for it? Thanks. For a shuffle, although `ShuffleExchange` returns a cached `ShuffledRowRDD` so `ReusedExchangeExec` can reuse it. In `ShuffledRowRDD`, it still needs to retrieve remote blocks again, because the previously retrieved remote blocks are not stored in local. So I am wondering if we explicitly call `cache` on the `ShuffledRowRDD` which is prepared for reusing, we can skip next round of remote retrieving. Will it improve the shuffle exchange reuse? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15115: [SPARK-17558] Bump Hadoop 2.7 version from 2.7.2 to 2.7....
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15115 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/65470/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15115: [SPARK-17558] Bump Hadoop 2.7 version from 2.7.2 to 2.7....
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15115 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15115: [SPARK-17558] Bump Hadoop 2.7 version from 2.7.2 to 2.7....
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15115 **[Test build #65470 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/65470/consoleFull)** for PR 15115 at commit [`1fc9047`](https://github.com/apache/spark/commit/1fc90473ca076ee7d6f473dd338e044d87e9351a). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15053: [Doc] improve python API docstrings
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/15053 @holdenk Personally, I like package level docstring more if we can write them pretty and well. (If we are not sure on that, then, I think we can maybe do this for each as a safe choice though). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14990: [SPARK-17426][SQL] Refactor `TreeNode.toJSON` to avoid O...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/14990 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/65473/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14990: [SPARK-17426][SQL] Refactor `TreeNode.toJSON` to avoid O...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/14990 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14990: [SPARK-17426][SQL] Refactor `TreeNode.toJSON` to avoid O...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/14990 **[Test build #65473 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/65473/consoleFull)** for PR 14990 at commit [`1451753`](https://github.com/apache/spark/commit/145175393f9d6526429cdada0a1db78ca8fe2998). * This patch **fails MiMa tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14947: [SPARK-17388][SQL] Support for inferring type date/times...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/14947 ping @davies --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15049: [SPARK-17310][SQL] Add an option to disable record-level...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/15049 ping @davies and @yhuai --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14990: [SPARK-17426][SQL] Refactor `TreeNode.toJSON` to avoid O...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/14990 **[Test build #65473 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/65473/consoleFull)** for PR 14990 at commit [`1451753`](https://github.com/apache/spark/commit/145175393f9d6526429cdada0a1db78ca8fe2998). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15064: [SPARK-17509][SQL]When wrapping catalyst datatype...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/15064#discussion_r79103035 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala --- @@ -240,74 +240,173 @@ private[hive] trait HiveInspectors { /** * Wraps with Hive types based on object inspector. - * TODO: Consolidate all hive OI/data interface code. */ protected def wrapperFor(oi: ObjectInspector, dataType: DataType): Any => Any = oi match { -case _: JavaHiveVarcharObjectInspector => +case x: ConstantObjectInspector => (o: Any) => -if (o != null) { - val s = o.asInstanceOf[UTF8String].toString - new HiveVarchar(s, s.length) -} else { - null +x.getWritableConstantValue +case x: PrimitiveObjectInspector => x match { + // TODO we don't support the HiveVarcharObjectInspector yet. + case _: StringObjectInspector if x.preferWritable() => +(o: Any) => getStringWritable(o) + case _: StringObjectInspector => +(o: Any) => if (o != null) o.asInstanceOf[UTF8String].toString() else null + case _: IntObjectInspector if x.preferWritable() => +(o: Any) => getIntWritable(o) + case _: IntObjectInspector => +(o: Any) => if (o != null) o.asInstanceOf[java.lang.Integer] else null --- End diff -- Let's add a helper method to abstract this null checking logic, e.g. ``` def withNullSafe(f: Any => Any): Any => Any = { input => if (input == null) null else f(null) } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15064: [SPARK-17509][SQL]When wrapping catalyst datatype...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/15064#discussion_r79102781 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala --- @@ -240,74 +240,173 @@ private[hive] trait HiveInspectors { /** * Wraps with Hive types based on object inspector. - * TODO: Consolidate all hive OI/data interface code. */ protected def wrapperFor(oi: ObjectInspector, dataType: DataType): Any => Any = oi match { -case _: JavaHiveVarcharObjectInspector => +case x: ConstantObjectInspector => (o: Any) => -if (o != null) { - val s = o.asInstanceOf[UTF8String].toString - new HiveVarchar(s, s.length) -} else { - null +x.getWritableConstantValue +case x: PrimitiveObjectInspector => x match { + // TODO we don't support the HiveVarcharObjectInspector yet. + case _: StringObjectInspector if x.preferWritable() => +(o: Any) => getStringWritable(o) --- End diff -- in `wrap` we will only hit this branch if the input is not null, is it safe to skip this null checking here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15064: [SPARK-17509][SQL]When wrapping catalyst datatype to Hiv...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/15064 the title is not fixed yet --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15073: [SPARK-17518] [SQL] Block Users to Specify the Internal ...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/15073 LGTM, pending jenkins --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15073: [SPARK-17518] [SQL] Block Users to Specify the Internal ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15073 **[Test build #65472 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/65472/consoleFull)** for PR 15073 at commit [`44f335b`](https://github.com/apache/spark/commit/44f335bb6d8bd17a29fd516b3a1b79aed9d1452d). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15073: [SPARK-17518] [SQL] Block Users to Specify the In...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/15073#discussion_r79102224 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala --- @@ -1151,6 +1152,56 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv } } + test("save API - format hive") { --- End diff -- Sure, change all of them to the message `Failed to find data source: hive` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15073: [SPARK-17518] [SQL] Block Users to Specify the In...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/15073#discussion_r79102198 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala --- @@ -80,6 +80,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * @since 1.4.0 */ def format(source: String): DataFrameWriter[T] = { +if (source.toLowerCase == "hive") { --- End diff -- Done. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14990: [SPARK-17426][SQL] Refactor `TreeNode.toJSON` to avoid O...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/14990 LGTM, pending jenkins --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14990: [SPARK-17426][SQL] Refactor `TreeNode.toJSON` to avoid O...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/14990 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14990: [SPARK-17426][SQL] Refactor `TreeNode.toJSON` to avoid O...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/14990 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/65468/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13513: [SPARK-15698][SQL][Streaming] Add the ability to ...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/13513#discussion_r79099764 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.collection.mutable + +import org.json4s.NoTypeHints +import org.json4s.jackson.Serialization + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.streaming.FileStreamSource.FileEntry +import org.apache.spark.sql.internal.SQLConf + +class FileStreamSourceLog( +metadataLogVersion: String, +sparkSession: SparkSession, +path: String) + extends CompactibleFileStreamLog[FileEntry](metadataLogVersion, sparkSession, path) { + + import CompactibleFileStreamLog._ + + // Configurations about metadata compaction + protected override val compactInterval = + sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL) + require(compactInterval > 0, +s"Please set ${SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key} (was $compactInterval) to a " + + s"positive value.") + + protected override val fileCleanupDelayMs = +sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY) + + protected override val isDeletingExpiredLog = +sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_DELETION) + + private implicit val formats = Serialization.formats(NoTypeHints) + + // A fixed size log cache to cache the file entries belong to the compaction batch. It is used + // to avoid scanning the compacted log file to retrieve it's own batch data. + private val cacheSize = compactInterval + private val fileEntryCache = new mutable.LinkedHashMap[Long, Array[FileEntry]] + + private def updateCache(batchId: Long, logs: Array[FileEntry]): Unit = { +if (fileEntryCache.size >= cacheSize) { + fileEntryCache.drop(1) +} + +fileEntryCache.put(batchId, logs) + } + + protected override def serializeData(data: FileEntry): String = { +Serialization.write(data) + } + + protected override def deserializeData(encodedString: String): FileEntry = { +Serialization.read[FileEntry](encodedString) + } + + def compactLogs(logs: Seq[FileEntry]): Seq[FileEntry] = { +logs + } + + override def add(batchId: Long, logs: Array[FileEntry]): Boolean = { +if (super.add(batchId, logs) && isCompactionBatch(batchId, compactInterval)) { + updateCache(batchId, logs) + true +} else if (!isCompactionBatch(batchId, compactInterval)) { + true +} else { + false +} + } + + override def get(startId: Option[Long], endId: Option[Long]): Array[(Long, Array[FileEntry])] = { +val startBatchId = startId.getOrElse(0L) +val endBatchId = getLatest().map(_._1).getOrElse(0L) + +val (existedBatches, removedBatches) = (startBatchId to endBatchId).map { id => + if (isCompactionBatch(id, compactInterval) && fileEntryCache.contains(id)) { +(id, Some(fileEntryCache(id))) + } else { +val logs = super.get(id).map(_.filter(_.batchId == id)) +(id, logs) + } +}.partition(_._2.isDefined) + +// The below code may only be happened when original metadata log file has been removed, so we +// have to get the batch from latest compacted log file. This is quite time-consuming and may +// not be happened in the current FileStreamSource code path, since we only fetch the +// latest metadata log file. +val searchKeys = removedBatches.map(_._1) +val retrievedBatches = if (searchKeys.nonEmpty) { + logWarning(s"Get batches from removed files, this is unexpected in the current code path!!!") + val latestBatchId = getLatest().ma
[GitHub] spark issue #14990: [SPARK-17426][SQL] Refactor `TreeNode.toJSON` to avoid O...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/14990 **[Test build #65468 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/65468/consoleFull)** for PR 14990 at commit [`ccdda37`](https://github.com/apache/spark/commit/ccdda374515e4aa8d818de7290bf968f457c0e51). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15102: [SPARK-17346][SQL] Add Kafka source for Structured Strea...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/15102 > I pushed for this code to be copied rather than refactored because I think this is the right direction long term. While it is nice to minimize inter-project dependencies, that is not really the motivation. While the code is very similar now, there a bunch of things I'd like to start changing: A bigger concern for me at this point is that the code was copied, and then modified in ways that don't seem to have anything to do with the necessities of structured streaming (e.g. your "why is this a nested class" comment). Options from my point of view, from best to worst 1. Refactor to a common submodule. Based on how little I had to change to get the common functionality in my branch, I think this is going to initially leave most things untouched. If things change in the future, they can be refactored / copied as necessary. I think this minimizes the chance that someone fixes a bug in Dstream cached consumer, and forgets to fix in sql cached consumer, or vice versa. 2. Copy without changes, make only minimal changes necessary at first. This is going to make what happened more obvious, and make it easier to maintain changes across both pieces of code 3. Copy and make unnecessary changes (what seems to have been done currently). This seems like a maintenance nightmare for no gain. > > I don't think that all the classes need to be type parameterized. Our interface SQL has its own type system, analyser, and interface to the type system of the JVM (encoders). We should be using that. Operators in SQL do not type parameterize in general. > To optimize performance, there are several tricks we might want to play eventually (maybe prefetching data during execution, etc). Kafka consumers prefetch data already, that's the main reason the CachedKafkaConsumer exists. My thought here is that there isn't much gain to be had with something more than a thin shim around a Kafka rdd, or at least not for a while. Kafka's data model doesn't really allow for much in terms of pushdown optimizations (you basically get to query by offset, or maybe time). About the only idea I've heard that might have promise was Reynold suggesting scheduling straight map jobs as long-running kafka consumers in a poll loop on the executors, to avoid batching latency. But that seems to open a whole can of worms in terms of deterministic behavior, and is probably much further down the road. If we get there, what's the harm in cutting shared dependencies at that point rather than now? > These are just ideas, but given that DStreams and Structured Streaming have significantly different models and user interfaces, I don't think that we want to tie ourselves to the same internals. If we identify utilities that are needed by both, then we should pull those out and share them. At this point, the shared need is basically everything except KafkaUtils' static constructors, and the parts of the DirectStream related to the DStream interface. You still need an rdd, a cache for consumers, offset ranges, a way to configure consumers, a way to configure locality, a consumer running on the driver to get latest offsets... > We don't need to handle the general problem of is kafka Offset A from Topic 1 before or after kafka Offset B from Topic 2. > > Does x: KafkaOffset == y: KafkaOffset (i.e. is there new data since the last time I checked)? We do need to handle it comparing completely different topicpartitions, because it's entirely possible to have a job with a single topicpartition A, which is deleted or unsubscribed, and then single topicpartition B is added, in the space of one batch. I have talked to companies that are actually doing this kind of thing. If all we need to do is be able to tell that one sql offset (that we already knew about) is different from another sql offset (that we just learned about), then I think it's pretty straightforward - your three cases are * error. ahead in some common topicpartitions, and behind in others. * equal. same kafka offsets for same topicpartitions * not equal. different offsets for same topicpartitions, and/or different topicpartitions That does imply that any ordering of sql Offsets is by when we learn about them in processing time, which sounds suspect, but... > The final version of this Source should almost certainly support wildcards with topicpartitions that change on the fly. Since it seems this is one of the harder problems to solve, as a strawman, I'd propose that we only support static lists of topics in this PR and possibly even static partitions. I want to get users to kick the tires on structured streaming in general and report whats missing so we can all prioritize our engineering effort.
[GitHub] spark issue #14990: [SPARK-17426][SQL] Refactor `TreeNode.toJSON` to avoid O...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/14990 **[Test build #65471 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/65471/consoleFull)** for PR 14990 at commit [`d6838d0`](https://github.com/apache/spark/commit/d6838d0a5575caedfc8ffcf7fcead6eb3bc793d1). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15073: [SPARK-17518] [SQL] Block Users to Specify the Internal ...
Github user rxin commented on the issue: https://github.com/apache/spark/pull/15073 OK got it. Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15073: [SPARK-17518] [SQL] Block Users to Specify the Internal ...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/15073 Based on my understanding, we are consolidating the write path, including providing a unified CREATE TABLE interface for both Hive serde tables and data source tables. So far, this feature is not ready. More ongoing works are needed before we can turn it on. If we do not block it, many bugs exist, since the interface (e.g, SQL, DataFrameWriter APIs, and createExternalTable APIs), DDL execution and metastore formats are still different. Thus, blocking the `hive` format is needed until we can officially support it. Let me know if my understanding is not right. cc @cloud-fan @yhuai --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14990: [SPARK-17426][SQL] Refactor `TreeNode.toJSON` to avoid O...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/14990 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14990: [SPARK-17426][SQL] Refactor `TreeNode.toJSON` to avoid O...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/14990 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/65467/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14990: [SPARK-17426][SQL] Refactor `TreeNode.toJSON` to avoid O...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/14990 **[Test build #65467 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/65467/consoleFull)** for PR 14990 at commit [`996e392`](https://github.com/apache/spark/commit/996e39291862a16fc5eea63bc8a05466b77f3cfb). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14678: [MINOR][SQL] Add missing functions for some options in S...
Github user rxin commented on the issue: https://github.com/apache/spark/pull/14678 Thanks but also FYI this is not "minor". We should have a JIRA ticket for changes like this in the future. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15064: [SPARK-17509][SQL]When wrapping catalyst datatype to Hiv...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15064 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15064: [SPARK-17509][SQL]When wrapping catalyst datatype to Hiv...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15064 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/65469/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15064: [SPARK-17509][SQL]When wrapping catalyst datatype to Hiv...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15064 **[Test build #65469 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/65469/consoleFull)** for PR 15064 at commit [`2ec685c`](https://github.com/apache/spark/commit/2ec685cb09b3c51b5f055c856285066a948482ee). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14912: [SPARK-17357][SQL] Fix current predicate pushdown
Github user viirya commented on the issue: https://github.com/apache/spark/pull/14912 ping @cloud-fan @hvanhovell @srinathshankar again, would you please take a look this? Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14780: [SPARK-17206][SQL] Support ANALYZE TABLE on analyzable t...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/14780 @hvanhovell Would you like to comment on this? Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14118: [SPARK-16462][SPARK-16460][SPARK-15144][SQL] Make CSV ca...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/14118 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14118: [SPARK-16462][SPARK-16460][SPARK-15144][SQL] Make CSV ca...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/14118 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/65466/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14118: [SPARK-16462][SPARK-16460][SPARK-15144][SQL] Make CSV ca...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/14118 **[Test build #65466 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/65466/consoleFull)** for PR 14118 at commit [`365cbfb`](https://github.com/apache/spark/commit/365cbfb02b58bc1992a635118ffba6b4e371cb06). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15115: [SPARK-17558] Bump Hadoop 2.7 version from 2.7.2 to 2.7....
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15115 **[Test build #65470 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/65470/consoleFull)** for PR 15115 at commit [`1fc9047`](https://github.com/apache/spark/commit/1fc90473ca076ee7d6f473dd338e044d87e9351a). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15115: [SPARK-17558] Bump Hadoop 2.7 version from 2.7.2 ...
GitHub user rxin opened a pull request: https://github.com/apache/spark/pull/15115 [SPARK-17558] Bump Hadoop 2.7 version from 2.7.2 to 2.7.3 ## What changes were proposed in this pull request? This patch bumps the Hadoop version in hadoop-2.7 profile from 2.7.2 to 2.7.3, which was recently released and contained a number of bug fixes. ## How was this patch tested? The change should be covered by existing tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/rxin/spark SPARK-17558 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/15115.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #15115 commit 1fc90473ca076ee7d6f473dd338e044d87e9351a Author: Reynold Xin Date: 2016-09-16T02:04:20Z [SPARK-17558] Bump Hadoop 2.7 version from 2.7.2 to 2.7.3 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15115: [SPARK-17558] Bump Hadoop 2.7 version from 2.7.2 to 2.7....
Github user rxin commented on the issue: https://github.com/apache/spark/pull/15115 cc @JoshRosen @srowen --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15064: [SPARK-17509][SQL]When wrapping catalyst datatype to Hiv...
Github user sitalkedia commented on the issue: https://github.com/apache/spark/pull/15064 @rxin - fixed, thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15064: [SPARK-17509]]When wrapping catalyst datatype to Hive da...
Github user rxin commented on the issue: https://github.com/apache/spark/pull/15064 cc @cloud-fan @hvanhovell --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15064: [SPARK-17509]]When wrapping catalyst datatype to Hive da...
Github user rxin commented on the issue: https://github.com/apache/spark/pull/15064 @sitalkedia can you fix the title? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15102: [SPARK-17346][SQL] Add Kafka source for Structure...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/15102#discussion_r79094763 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala --- @@ -0,0 +1,446 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.{util => ju} + +import scala.collection.JavaConverters._ + +import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, KafkaConsumer} +import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.serialization.ByteArrayDeserializer + +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler.ExecutorCacheTaskLocation +import org.apache.spark.sql._ +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.kafka010.KafkaSource._ +import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider} +import org.apache.spark.sql.types._ +import org.apache.spark.SparkContext + +/** + * A [[Source]] that uses Kafka's own [[KafkaConsumer]] API to reads data from Kafka. The design + * for this source is as follows. + * + * - The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this source that contains + * a map of TopicPartition -> offset. Note that this offset is 1 + (available offset). For + * example if the last record in a Kafka topic "t", partition 2 is offset 5, then + * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is done keep it consistent + * with the semantics of `KafkaConsumer.position()`. + * + * - The [[ConsumerStrategy]] class defines which Kafka topics and partitions should be read + * by this source. These strategies directly correspond to the different consumption options + * in . This class is designed to return a configured + * [[KafkaConsumer]] that is used by the [[KafkaSource]] to query for the offsets. + * See the docs on [[org.apache.spark.sql.kafka010.KafkaSource.ConsumerStrategy]] for + * more details. + * + * - The [[KafkaSource]] written to do the following. + * + * - As soon as the source is created, the pre-configured KafkaConsumer returned by the + *[[ConsumerStrategy]] is used to query the initial offsets that this source should + *start reading from. This used to create the first batch. + * + * - `getOffset()` uses the KafkaConsumer to query the latest available offsets, which are + * returned as a [[KafkaSourceOffset]]. + * + * - `getBatch()` returns a DF that reads from the 'start offset' until the 'end offset' in + * for each partition. The end offset is excluded to be consistent with the semantics of + * [[KafkaSourceOffset]] and `KafkaConsumer.position()`. + * + * - The DF returned is based on [[KafkaSourceRDD]] which is constructed such that the + * data from Kafka topic + partition is consistently read by the same executors across + * batches, and cached KafkaConsumers in the executors can be reused efficiently. See the + * docs on [[KafkaSourceRDD]] for more details. + */ +private[kafka010] case class KafkaSource( +sqlContext: SQLContext, +consumerStrategy: ConsumerStrategy[Array[Byte], Array[Byte]], +executorKafkaParams: ju.Map[String, Object], +sourceOptions: Map[String, String]) + extends Source with Logging { + + @transient private val consumer = consumerStrategy.createConsumer() + @transient private val sc = sqlContext.sparkContext + @transient private val initialPartitionOffsets = fetchPartitionOffsets(seekToLatest = false) + logInfo(s"Initial offsets: " + initialPartitionOffsets) + + override def schema: StructType = KafkaSource.kafkaSchema + + /** Returns the maximum available offset for this source. */ + override def getOf
[GitHub] spark issue #15064: [SPARK-17509]]When wrapping catalyst datatype to Hive da...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15064 Build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15064: [SPARK-17509]]When wrapping catalyst datatype to Hive da...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15064 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/65463/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15064: [SPARK-17509]]When wrapping catalyst datatype to Hive da...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15064 **[Test build #65463 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/65463/consoleFull)** for PR 15064 at commit [`31b2e6b`](https://github.com/apache/spark/commit/31b2e6bcbb701705f5efaa5fbaa339d3c3b769d5). * This patch passes all tests. * This patch **does not merge cleanly**. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15043: [SPARK-17491] Close serialization stream to fix wrong an...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15043 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/65465/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15043: [SPARK-17491] Close serialization stream to fix wrong an...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15043 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15043: [SPARK-17491] Close serialization stream to fix wrong an...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15043 **[Test build #65465 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/65465/consoleFull)** for PR 15043 at commit [`0d70774`](https://github.com/apache/spark/commit/0d70774e1db04edb46b312efc4b1646d7201fb03). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15102: [SPARK-17346][SQL] Add Kafka source for Structure...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/15102#discussion_r79093876 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala --- @@ -0,0 +1,446 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.{util => ju} + +import scala.collection.JavaConverters._ + +import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, KafkaConsumer} +import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.serialization.ByteArrayDeserializer + +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler.ExecutorCacheTaskLocation +import org.apache.spark.sql._ +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.kafka010.KafkaSource._ +import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider} +import org.apache.spark.sql.types._ +import org.apache.spark.SparkContext + +/** + * A [[Source]] that uses Kafka's own [[KafkaConsumer]] API to reads data from Kafka. The design + * for this source is as follows. + * + * - The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this source that contains + * a map of TopicPartition -> offset. Note that this offset is 1 + (available offset). For + * example if the last record in a Kafka topic "t", partition 2 is offset 5, then + * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is done keep it consistent + * with the semantics of `KafkaConsumer.position()`. + * + * - The [[ConsumerStrategy]] class defines which Kafka topics and partitions should be read + * by this source. These strategies directly correspond to the different consumption options + * in . This class is designed to return a configured + * [[KafkaConsumer]] that is used by the [[KafkaSource]] to query for the offsets. + * See the docs on [[org.apache.spark.sql.kafka010.KafkaSource.ConsumerStrategy]] for + * more details. + * + * - The [[KafkaSource]] written to do the following. + * + * - As soon as the source is created, the pre-configured KafkaConsumer returned by the + *[[ConsumerStrategy]] is used to query the initial offsets that this source should + *start reading from. This used to create the first batch. + * + * - `getOffset()` uses the KafkaConsumer to query the latest available offsets, which are + * returned as a [[KafkaSourceOffset]]. + * + * - `getBatch()` returns a DF that reads from the 'start offset' until the 'end offset' in + * for each partition. The end offset is excluded to be consistent with the semantics of + * [[KafkaSourceOffset]] and `KafkaConsumer.position()`. + * + * - The DF returned is based on [[KafkaSourceRDD]] which is constructed such that the + * data from Kafka topic + partition is consistently read by the same executors across + * batches, and cached KafkaConsumers in the executors can be reused efficiently. See the + * docs on [[KafkaSourceRDD]] for more details. + */ +private[kafka010] case class KafkaSource( +sqlContext: SQLContext, +consumerStrategy: ConsumerStrategy[Array[Byte], Array[Byte]], +executorKafkaParams: ju.Map[String, Object], +sourceOptions: Map[String, String]) + extends Source with Logging { + + @transient private val consumer = consumerStrategy.createConsumer() + @transient private val sc = sqlContext.sparkContext + @transient private val initialPartitionOffsets = fetchPartitionOffsets(seekToLatest = false) + logInfo(s"Initial offsets: " + initialPartitionOffsets) + + override def schema: StructType = KafkaSource.kafkaSchema + + /** Returns the maximum available offset for this source. */ + override def getOf
[GitHub] spark pull request #13513: [SPARK-15698][SQL][Streaming] Add the ability to ...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/13513#discussion_r79093102 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.collection.mutable + +import org.json4s.NoTypeHints +import org.json4s.jackson.Serialization + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.streaming.FileStreamSource.FileEntry +import org.apache.spark.sql.internal.SQLConf + +class FileStreamSourceLog( +metadataLogVersion: String, +sparkSession: SparkSession, +path: String) + extends CompactibleFileStreamLog[FileEntry](metadataLogVersion, sparkSession, path) { + + import CompactibleFileStreamLog._ + + // Configurations about metadata compaction + protected override val compactInterval = + sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL) + require(compactInterval > 0, +s"Please set ${SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key} (was $compactInterval) to a " + + s"positive value.") + + protected override val fileCleanupDelayMs = +sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY) + + protected override val isDeletingExpiredLog = +sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_DELETION) + + private implicit val formats = Serialization.formats(NoTypeHints) + + // A fixed size log cache to cache the file entries belong to the compaction batch. It is used + // to avoid scanning the compacted log file to retrieve it's own batch data. + private val cacheSize = compactInterval + private val fileEntryCache = new mutable.LinkedHashMap[Long, Array[FileEntry]] + + private def updateCache(batchId: Long, logs: Array[FileEntry]): Unit = { +if (fileEntryCache.size >= cacheSize) { + fileEntryCache.drop(1) --- End diff -- I see, sorry for this issue. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15102: [SPARK-17346][SQL] Add Kafka source for Structure...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/15102#discussion_r79092976 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala --- @@ -0,0 +1,446 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.{util => ju} + +import scala.collection.JavaConverters._ + +import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, KafkaConsumer} +import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.serialization.ByteArrayDeserializer + +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler.ExecutorCacheTaskLocation +import org.apache.spark.sql._ +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.kafka010.KafkaSource._ +import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider} +import org.apache.spark.sql.types._ +import org.apache.spark.SparkContext + +/** + * A [[Source]] that uses Kafka's own [[KafkaConsumer]] API to reads data from Kafka. The design + * for this source is as follows. + * + * - The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this source that contains + * a map of TopicPartition -> offset. Note that this offset is 1 + (available offset). For + * example if the last record in a Kafka topic "t", partition 2 is offset 5, then + * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is done keep it consistent + * with the semantics of `KafkaConsumer.position()`. + * + * - The [[ConsumerStrategy]] class defines which Kafka topics and partitions should be read + * by this source. These strategies directly correspond to the different consumption options + * in . This class is designed to return a configured + * [[KafkaConsumer]] that is used by the [[KafkaSource]] to query for the offsets. + * See the docs on [[org.apache.spark.sql.kafka010.KafkaSource.ConsumerStrategy]] for + * more details. + * + * - The [[KafkaSource]] written to do the following. + * + * - As soon as the source is created, the pre-configured KafkaConsumer returned by the + *[[ConsumerStrategy]] is used to query the initial offsets that this source should + *start reading from. This used to create the first batch. + * + * - `getOffset()` uses the KafkaConsumer to query the latest available offsets, which are + * returned as a [[KafkaSourceOffset]]. + * + * - `getBatch()` returns a DF that reads from the 'start offset' until the 'end offset' in + * for each partition. The end offset is excluded to be consistent with the semantics of + * [[KafkaSourceOffset]] and `KafkaConsumer.position()`. + * + * - The DF returned is based on [[KafkaSourceRDD]] which is constructed such that the + * data from Kafka topic + partition is consistently read by the same executors across + * batches, and cached KafkaConsumers in the executors can be reused efficiently. See the + * docs on [[KafkaSourceRDD]] for more details. + */ +private[kafka010] case class KafkaSource( +sqlContext: SQLContext, +consumerStrategy: ConsumerStrategy[Array[Byte], Array[Byte]], +executorKafkaParams: ju.Map[String, Object], +sourceOptions: Map[String, String]) + extends Source with Logging { + + @transient private val consumer = consumerStrategy.createConsumer() + @transient private val sc = sqlContext.sparkContext + @transient private val initialPartitionOffsets = fetchPartitionOffsets(seekToLatest = false) + logInfo(s"Initial offsets: " + initialPartitionOffsets) + + override def schema: StructType = KafkaSource.kafkaSchema + + /** Returns the maximum available offset for this source. */ + override def getOf
[GitHub] spark issue #15073: [SPARK-17518] [SQL] Block Users to Specify the Internal ...
Github user rxin commented on the issue: https://github.com/apache/spark/pull/15073 Curious - why do we want to block it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15064: [SPARK-17509]]When wrapping catalyst datatype to Hive da...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15064 **[Test build #65469 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/65469/consoleFull)** for PR 15064 at commit [`2ec685c`](https://github.com/apache/spark/commit/2ec685cb09b3c51b5f055c856285066a948482ee). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14990: [SPARK-17426][SQL] Refactor `TreeNode.toJSON` to avoid O...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/14990 **[Test build #65468 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/65468/consoleFull)** for PR 14990 at commit [`ccdda37`](https://github.com/apache/spark/commit/ccdda374515e4aa8d818de7290bf968f457c0e51). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15064: [SPARK-17509]]When wrapping catalyst datatype to Hive da...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15064 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15064: [SPARK-17509]]When wrapping catalyst datatype to Hive da...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15064 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/65464/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15064: [SPARK-17509]]When wrapping catalyst datatype to Hive da...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15064 **[Test build #65464 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/65464/consoleFull)** for PR 15064 at commit [`87a96d9`](https://github.com/apache/spark/commit/87a96d93da7e06d04689879fc5a8e1293dc0c54e). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14990: [SPARK-17426][SQL] Refactor `TreeNode.toJSON` to avoid O...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/14990 **[Test build #65467 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/65467/consoleFull)** for PR 14990 at commit [`996e392`](https://github.com/apache/spark/commit/996e39291862a16fc5eea63bc8a05466b77f3cfb). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15102: [SPARK-17346][SQL] Add Kafka source for Structure...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/15102#discussion_r79088749 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala --- @@ -0,0 +1,446 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.{util => ju} + +import scala.collection.JavaConverters._ + +import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, KafkaConsumer} +import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.serialization.ByteArrayDeserializer + +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler.ExecutorCacheTaskLocation +import org.apache.spark.sql._ +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.kafka010.KafkaSource._ +import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider} +import org.apache.spark.sql.types._ +import org.apache.spark.SparkContext + +/** + * A [[Source]] that uses Kafka's own [[KafkaConsumer]] API to reads data from Kafka. The design + * for this source is as follows. + * + * - The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this source that contains + * a map of TopicPartition -> offset. Note that this offset is 1 + (available offset). For + * example if the last record in a Kafka topic "t", partition 2 is offset 5, then + * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is done keep it consistent + * with the semantics of `KafkaConsumer.position()`. + * + * - The [[ConsumerStrategy]] class defines which Kafka topics and partitions should be read + * by this source. These strategies directly correspond to the different consumption options + * in . This class is designed to return a configured + * [[KafkaConsumer]] that is used by the [[KafkaSource]] to query for the offsets. + * See the docs on [[org.apache.spark.sql.kafka010.KafkaSource.ConsumerStrategy]] for + * more details. + * + * - The [[KafkaSource]] written to do the following. + * + * - As soon as the source is created, the pre-configured KafkaConsumer returned by the + *[[ConsumerStrategy]] is used to query the initial offsets that this source should + *start reading from. This used to create the first batch. + * + * - `getOffset()` uses the KafkaConsumer to query the latest available offsets, which are + * returned as a [[KafkaSourceOffset]]. + * + * - `getBatch()` returns a DF that reads from the 'start offset' until the 'end offset' in + * for each partition. The end offset is excluded to be consistent with the semantics of + * [[KafkaSourceOffset]] and `KafkaConsumer.position()`. + * + * - The DF returned is based on [[KafkaSourceRDD]] which is constructed such that the + * data from Kafka topic + partition is consistently read by the same executors across + * batches, and cached KafkaConsumers in the executors can be reused efficiently. See the + * docs on [[KafkaSourceRDD]] for more details. + */ +private[kafka010] case class KafkaSource( +sqlContext: SQLContext, +consumerStrategy: ConsumerStrategy[Array[Byte], Array[Byte]], +executorKafkaParams: ju.Map[String, Object], +sourceOptions: Map[String, String]) + extends Source with Logging { + + @transient private val consumer = consumerStrategy.createConsumer() + @transient private val sc = sqlContext.sparkContext + @transient private val initialPartitionOffsets = fetchPartitionOffsets(seekToLatest = false) + logInfo(s"Initial offsets: " + initialPartitionOffsets) + + override def schema: StructType = KafkaSource.kafkaSchema + + /** Returns the maximum available offset for this source. */ + override def getOff
[GitHub] spark pull request #15102: [SPARK-17346][SQL] Add Kafka source for Structure...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/15102#discussion_r79089396 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala --- @@ -0,0 +1,446 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.{util => ju} + +import scala.collection.JavaConverters._ + +import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, KafkaConsumer} +import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.serialization.ByteArrayDeserializer + +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler.ExecutorCacheTaskLocation +import org.apache.spark.sql._ +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.kafka010.KafkaSource._ +import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider} +import org.apache.spark.sql.types._ +import org.apache.spark.SparkContext + +/** + * A [[Source]] that uses Kafka's own [[KafkaConsumer]] API to reads data from Kafka. The design + * for this source is as follows. + * + * - The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this source that contains + * a map of TopicPartition -> offset. Note that this offset is 1 + (available offset). For + * example if the last record in a Kafka topic "t", partition 2 is offset 5, then + * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is done keep it consistent + * with the semantics of `KafkaConsumer.position()`. + * + * - The [[ConsumerStrategy]] class defines which Kafka topics and partitions should be read + * by this source. These strategies directly correspond to the different consumption options + * in . This class is designed to return a configured + * [[KafkaConsumer]] that is used by the [[KafkaSource]] to query for the offsets. + * See the docs on [[org.apache.spark.sql.kafka010.KafkaSource.ConsumerStrategy]] for + * more details. + * + * - The [[KafkaSource]] written to do the following. + * + * - As soon as the source is created, the pre-configured KafkaConsumer returned by the + *[[ConsumerStrategy]] is used to query the initial offsets that this source should + *start reading from. This used to create the first batch. + * + * - `getOffset()` uses the KafkaConsumer to query the latest available offsets, which are + * returned as a [[KafkaSourceOffset]]. + * + * - `getBatch()` returns a DF that reads from the 'start offset' until the 'end offset' in + * for each partition. The end offset is excluded to be consistent with the semantics of + * [[KafkaSourceOffset]] and `KafkaConsumer.position()`. + * + * - The DF returned is based on [[KafkaSourceRDD]] which is constructed such that the + * data from Kafka topic + partition is consistently read by the same executors across + * batches, and cached KafkaConsumers in the executors can be reused efficiently. See the + * docs on [[KafkaSourceRDD]] for more details. + */ +private[kafka010] case class KafkaSource( +sqlContext: SQLContext, +consumerStrategy: ConsumerStrategy[Array[Byte], Array[Byte]], +executorKafkaParams: ju.Map[String, Object], +sourceOptions: Map[String, String]) + extends Source with Logging { + + @transient private val consumer = consumerStrategy.createConsumer() + @transient private val sc = sqlContext.sparkContext + @transient private val initialPartitionOffsets = fetchPartitionOffsets(seekToLatest = false) + logInfo(s"Initial offsets: " + initialPartitionOffsets) + + override def schema: StructType = KafkaSource.kafkaSchema + + /** Returns the maximum available offset for this source. */ + override def getOff
[GitHub] spark pull request #15102: [SPARK-17346][SQL] Add Kafka source for Structure...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/15102#discussion_r79088295 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala --- @@ -0,0 +1,446 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.{util => ju} + +import scala.collection.JavaConverters._ + +import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, KafkaConsumer} +import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.serialization.ByteArrayDeserializer + +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler.ExecutorCacheTaskLocation +import org.apache.spark.sql._ +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.kafka010.KafkaSource._ +import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider} +import org.apache.spark.sql.types._ +import org.apache.spark.SparkContext + +/** + * A [[Source]] that uses Kafka's own [[KafkaConsumer]] API to reads data from Kafka. The design --- End diff -- nit: "read" data --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15102: [SPARK-17346][SQL] Add Kafka source for Structure...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/15102#discussion_r79088253 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.{util => ju} + +import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer} +import org.apache.kafka.common.{KafkaException, TopicPartition} + +import org.apache.spark.internal.Logging + + +/** + * Consumer of single topicpartition, intended for cached reuse. + * Underlying consumer is not threadsafe, so neither is this, + * but processing the same topicpartition and group id in multiple threads is usually bad anyway. + */ +class CachedKafkaConsumer[K, V] private( --- End diff -- I mentioned this in a larger comment, but we might want to consider removing these. I think that this source should probably always pass data to execution as bytes (maybe even copying them directly in tungsten rows eventually). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15102: [SPARK-17346][SQL] Add Kafka source for Structure...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/15102#discussion_r79089541 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala --- @@ -0,0 +1,446 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.{util => ju} + +import scala.collection.JavaConverters._ + +import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, KafkaConsumer} +import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.serialization.ByteArrayDeserializer + +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler.ExecutorCacheTaskLocation +import org.apache.spark.sql._ +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.kafka010.KafkaSource._ +import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider} +import org.apache.spark.sql.types._ +import org.apache.spark.SparkContext + +/** + * A [[Source]] that uses Kafka's own [[KafkaConsumer]] API to reads data from Kafka. The design + * for this source is as follows. + * + * - The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this source that contains + * a map of TopicPartition -> offset. Note that this offset is 1 + (available offset). For + * example if the last record in a Kafka topic "t", partition 2 is offset 5, then + * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is done keep it consistent + * with the semantics of `KafkaConsumer.position()`. + * + * - The [[ConsumerStrategy]] class defines which Kafka topics and partitions should be read + * by this source. These strategies directly correspond to the different consumption options + * in . This class is designed to return a configured + * [[KafkaConsumer]] that is used by the [[KafkaSource]] to query for the offsets. + * See the docs on [[org.apache.spark.sql.kafka010.KafkaSource.ConsumerStrategy]] for + * more details. + * + * - The [[KafkaSource]] written to do the following. + * + * - As soon as the source is created, the pre-configured KafkaConsumer returned by the + *[[ConsumerStrategy]] is used to query the initial offsets that this source should + *start reading from. This used to create the first batch. + * + * - `getOffset()` uses the KafkaConsumer to query the latest available offsets, which are + * returned as a [[KafkaSourceOffset]]. + * + * - `getBatch()` returns a DF that reads from the 'start offset' until the 'end offset' in + * for each partition. The end offset is excluded to be consistent with the semantics of + * [[KafkaSourceOffset]] and `KafkaConsumer.position()`. + * + * - The DF returned is based on [[KafkaSourceRDD]] which is constructed such that the + * data from Kafka topic + partition is consistently read by the same executors across + * batches, and cached KafkaConsumers in the executors can be reused efficiently. See the + * docs on [[KafkaSourceRDD]] for more details. + */ +private[kafka010] case class KafkaSource( +sqlContext: SQLContext, +consumerStrategy: ConsumerStrategy[Array[Byte], Array[Byte]], +executorKafkaParams: ju.Map[String, Object], +sourceOptions: Map[String, String]) + extends Source with Logging { + + @transient private val consumer = consumerStrategy.createConsumer() + @transient private val sc = sqlContext.sparkContext + @transient private val initialPartitionOffsets = fetchPartitionOffsets(seekToLatest = false) + logInfo(s"Initial offsets: " + initialPartitionOffsets) + + override def schema: StructType = KafkaSource.kafkaSchema + + /** Returns the maximum available offset for this source. */ + override def getOff