[GitHub] spark pull request #16744: [SPARK-19405][STREAMING] Support for cross-accoun...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/16744#discussion_r102338189 --- Diff: python/pyspark/streaming/kinesis.py --- @@ -37,7 +37,8 @@ class KinesisUtils(object): def createStream(ssc, kinesisAppName, streamName, endpointUrl, regionName, initialPositionInStream, checkpointInterval, storageLevel=StorageLevel.MEMORY_AND_DISK_2, - awsAccessKeyId=None, awsSecretKey=None, decoder=utf8_decoder): + awsAccessKeyId=None, awsSecretKey=None, stsAssumeRoleArn=None, + stsSessionName=None, stsExternalId=None, decoder=utf8_decoder): --- End diff -- Will do --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16744: [SPARK-19405][STREAMING] Support for cross-accoun...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/16744#discussion_r102338119 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala --- @@ -78,8 +70,9 @@ case class SerializableAWSCredentials(accessKeyId: String, secretKey: String) *See the Kinesis Spark Streaming documentation for more *details on the different types of checkpoints. * @param storageLevel Storage level to use for storing the received objects - * @param awsCredentialsOption Optional AWS credentials, used when user directly specifies - * the credentials + * @param kinesisCredsProvider SerializableCredentialsPRovider instance that will be used to --- End diff -- Fixed this locally, thanks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16744: [SPARK-19405][STREAMING] Support for cross-account Kines...
Github user budde commented on the issue: https://github.com/apache/spark/pull/16744 @brkyvz Anyone I can ping to help get this merged? The PR is going on a month old at this point and I know that lack of STS support is an issue that several interested parties would like to see get resolved. I'll sleep better once I know this has made into the master branch :) I can begin working on a Kinesis stream builder as a separate issue/PR but I'd like to see this merged before opening a new PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16944: [SPARK-19611][SQL] Introduce configurable table schema i...
Github user budde commented on the issue: https://github.com/apache/spark/pull/16944 Pinging participants from #16797 once more to get any feedback on the new proposal: @gatorsmile, @viirya, @ericl, @mallman and @cloud-fan --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16744: [SPARK-19405][STREAMING] Support for cross-account Kines...
Github user budde commented on the issue: https://github.com/apache/spark/pull/16744 @brkyvz Just for clarification, can this PR be merged as-is with a separate Jira/PR for adding a builder interface or is the builder interface a prerequisite for merging this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16944: [SPARK-19611][SQL] Introduce configurable table schema i...
Github user budde commented on the issue: https://github.com/apache/spark/pull/16944 @viirya I've updated the PR to include the initial catalog table checks you've suggested in the [```setupCaseSensitiveTable()```](https://github.com/apache/spark/pull/16944/files#diff-f3b945ffe3f3f57b520b655c8b918a80R124) helper method. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16944: [SPARK-19611][SQL] Introduce configurable table schema i...
Github user budde commented on the issue: https://github.com/apache/spark/pull/16944 retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16944: [SPARK-19611][SQL] Introduce configurable table s...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/16944#discussion_r101908155 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala --- @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import java.io.File +import java.util.concurrent.{Executors, TimeUnit} + +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.metrics.source.HiveCatalogMetrics +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog._ +import org.apache.spark.sql.execution.datasources.FileStatusCache +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.hive.client.HiveClient +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.HiveCaseSensitiveInferenceMode +import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.types._ + +class HiveSchemaInferenceSuite + extends QueryTest with TestHiveSingleton with SQLTestUtils with BeforeAndAfterEach { + + import HiveSchemaInferenceSuite._ + + override def beforeEach(): Unit = { +super.beforeEach() +FileStatusCache.resetForTesting() + } + + override def afterEach(): Unit = { +super.afterEach() +FileStatusCache.resetForTesting() + } + + private val externalCatalog = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog] + private val lowercaseSchema = StructType(Seq( +StructField("fieldone", LongType), +StructField("partcol1", IntegerType), +StructField("partcol2", IntegerType))) + private val caseSensitiveSchema = StructType(Seq( +StructField("fieldOne", LongType), +// Partition columns remain case-insensitive +StructField("partcol1", IntegerType), +StructField("partcol2", IntegerType))) + + // Create a CatalogTable instance modeling an external Hive Metastore table backed by + // Parquet data files. + private def hiveExternalCatalogTable( + tableName: String, + location: String, + schema: StructType, + partitionColumns: Seq[String], + properties: Map[String, String] = Map.empty): CatalogTable = { +CatalogTable( + identifier = TableIdentifier(table = tableName, database = Option(DATABASE)), + tableType = CatalogTableType.EXTERNAL, + storage = CatalogStorageFormat( +locationUri = Option(location), +inputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"), +outputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"), +serde = Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"), +compressed = false, +properties = Map("serialization.format" -> "1")), + schema = schema, + provider = Option("hive"), + partitionColumnNames = partitionColumns, + properties = properties) + } + + // Creates CatalogTablePartition instances for adding partitions of data to our test table. + private def hiveCatalogPartition(location: String, index: Int): CatalogTablePartition += CatalogTablePartition( + spec = Map("partcol1" -> index.toString, "partcol2" -> index.toString), + storage = CatalogStorageFormat( +locationUri = Option(s"${location}/partCol1=$index/partCol2=$index/"), +inputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"), +outputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"), +
[GitHub] spark pull request #16944: [SPARK-19611][SQL] Introduce configurable table s...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/16944#discussion_r101908105 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala --- @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import java.io.File +import java.util.concurrent.{Executors, TimeUnit} + +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.metrics.source.HiveCatalogMetrics +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog._ +import org.apache.spark.sql.execution.datasources.FileStatusCache +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.hive.client.HiveClient +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.HiveCaseSensitiveInferenceMode +import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.types._ + +class HiveSchemaInferenceSuite + extends QueryTest with TestHiveSingleton with SQLTestUtils with BeforeAndAfterEach { + + import HiveSchemaInferenceSuite._ + + override def beforeEach(): Unit = { +super.beforeEach() +FileStatusCache.resetForTesting() + } + + override def afterEach(): Unit = { +super.afterEach() +FileStatusCache.resetForTesting() + } + + private val externalCatalog = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog] + private val lowercaseSchema = StructType(Seq( +StructField("fieldone", LongType), +StructField("partcol1", IntegerType), +StructField("partcol2", IntegerType))) + private val caseSensitiveSchema = StructType(Seq( +StructField("fieldOne", LongType), +// Partition columns remain case-insensitive +StructField("partcol1", IntegerType), +StructField("partcol2", IntegerType))) + + // Create a CatalogTable instance modeling an external Hive Metastore table backed by + // Parquet data files. + private def hiveExternalCatalogTable( + tableName: String, + location: String, + schema: StructType, + partitionColumns: Seq[String], + properties: Map[String, String] = Map.empty): CatalogTable = { +CatalogTable( + identifier = TableIdentifier(table = tableName, database = Option(DATABASE)), + tableType = CatalogTableType.EXTERNAL, + storage = CatalogStorageFormat( +locationUri = Option(location), +inputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"), +outputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"), +serde = Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"), +compressed = false, +properties = Map("serialization.format" -> "1")), + schema = schema, + provider = Option("hive"), + partitionColumnNames = partitionColumns, + properties = properties) + } + + // Creates CatalogTablePartition instances for adding partitions of data to our test table. + private def hiveCatalogPartition(location: String, index: Int): CatalogTablePartition += CatalogTablePartition( + spec = Map("partcol1" -> index.toString, "partcol2" -> index.toString), + storage = CatalogStorageFormat( +locationUri = Option(s"${location}/partCol1=$index/partCol2=$index/"), +inputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"), +outputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"), +
[GitHub] spark issue #16944: [SPARK-19611][SQL] Introduce configurable table schema i...
Github user budde commented on the issue: https://github.com/apache/spark/pull/16944 Pinging @viirya and @ericl to take a look at the updates per their feedback --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16744: [SPARK-19405][STREAMING] Support for cross-account Kines...
Github user budde commented on the issue: https://github.com/apache/spark/pull/16744 @brkyvz Fair enough. Let me know if there's anything I can do to help get this merged. I can also take a look at adding a builder class for Kinesis streams as a separate PR before the code freeze. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16744: [SPARK-19405][STREAMING] Support for cross-account Kines...
Github user budde commented on the issue: https://github.com/apache/spark/pull/16744 @brkyvz, @zsxwing â Any update here? Worried that this PR is starting to languish. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16944: [SPARK-19611][SQL] Introduce configurable table schema i...
Github user budde commented on the issue: https://github.com/apache/spark/pull/16944 I've updated the PR based on feedback received. Changes from previous commit: - Fixed a couple indent issues - Clarify some HiveSchemaInferenceSuite comments and general cleanup - Add CatalogTable checks and ```NEVER_INFER``` test to SchemaInferenceSuite - Added additional info/error logging to HiveMetastoreCatalog - Catch nonfatal exceptions from alterTable() call in HiveMetastoreCatalog - Change param name to ```spark.sql.hive.caseSensitiveInferenceMode``` - Rename ```CatalogTable.schemaFromTableProps``` to ```CatalogTable.schemaPreservesCase``` - Introduce ```HiveCaseSensitiveInferenceMode``` enumeration type to get rid of "magic strings" used for inference modes - Use updated CatalogTable record in LogicalRelation if ```INFER_AND_SAVE``` used --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16944: [SPARK-19611][SQL] Introduce configurable table s...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/16944#discussion_r101625724 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -296,6 +296,21 @@ object SQLConf { .longConf .createWithDefault(250 * 1024 * 1024) + object HiveCaseSensitiveInferenceMode extends Enumeration { --- End diff -- Is there a more-appropriate place I can put this Enumeration? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16944: [SPARK-19611][SQL] Introduce configurable table s...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/16944#discussion_r101606197 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala --- @@ -186,8 +212,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log fileFormat = defaultSource, options = options)(sparkSession = sparkSession) - val created = LogicalRelation(relation, -catalogTable = Some(metastoreRelation.catalogTable)) + val created = LogicalRelation(relation, catalogTable = Some(catalogTable)) --- End diff -- This would only be appropriate if the mode is INFER_AND_SAVE and the call to ```alterTable()``` succeeds, right? I'll see if I can refactor this method to do this without making it too convoluted. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16944: [SPARK-19611][SQL] Introduce configurable table s...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/16944#discussion_r101605728 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -296,6 +296,17 @@ object SQLConf { .longConf .createWithDefault(250 * 1024 * 1024) + val HIVE_SCHEMA_INFERENCE_MODE = buildConf("spark.sql.hive.schemaInferenceMode") --- End diff -- Will do --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16944: [SPARK-19611][SQL] Introduce configurable table s...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/16944#discussion_r101605711 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala --- @@ -163,6 +163,10 @@ case class BucketSpec( * @param tracksPartitionsInCatalog whether this table's partition metadata is stored in the * catalog. If false, it is inferred automatically based on file * structure. + * @param schemaFromTableProps Whether the schema field was obtained by parsing a case-sensitive --- End diff -- Will do --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16944: [SPARK-19611][SQL] Introduce configurable table s...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/16944#discussion_r101560890 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala --- @@ -161,23 +161,49 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log bucketSpec, Some(partitionSchema)) +val catalogTable = metastoreRelation.catalogTable val logicalRelation = cached.getOrElse { val sizeInBytes = metastoreRelation.stats(sparkSession.sessionState.conf).sizeInBytes.toLong val fileIndex = { -val index = new CatalogFileIndex( - sparkSession, metastoreRelation.catalogTable, sizeInBytes) +val index = new CatalogFileIndex(sparkSession, catalogTable, sizeInBytes) if (lazyPruningEnabled) { index } else { index.filterPartitions(Nil) // materialize all the partitions in memory } } val partitionSchemaColumnNames = partitionSchema.map(_.name.toLowerCase).toSet - val dataSchema = -StructType(metastoreSchema + val filteredMetastoreSchema = StructType(metastoreSchema .filterNot(field => partitionSchemaColumnNames.contains(field.name.toLowerCase))) + val inferenceMode = sparkSession.sessionState.conf.schemaInferenceMode + val dataSchema = if (inferenceMode != "NEVER_INFER" && + !catalogTable.schemaFromTableProps) { +val fileStatuses = fileIndex.listFiles(Nil).flatMap(_.files) +val inferred = defaultSource.inferSchema(sparkSession, options, fileStatuses) +val merged = if (fileType.equals("parquet")) { + inferred.map(ParquetFileFormat.mergeMetastoreParquetSchema(metastoreSchema, _)) +} else { + inferred --- End diff -- I took this from how the schema was inferred in HiveMetastoreCatalog prior to 2.1.0. Only ParquetFileFormat has a merge method. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16944: [SPARK-19611][SQL] Introduce configurable table s...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/16944#discussion_r101562475 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -296,6 +296,17 @@ object SQLConf { .longConf .createWithDefault(250 * 1024 * 1024) + val HIVE_SCHEMA_INFERENCE_MODE = buildConf("spark.sql.hive.schemaInferenceMode") +.doc("Configures the action to take when a case-sensitive schema cannot be read from a Hive " + + "table's properties. Valid options include INFER_AND_SAVE (infer the case-sensitive " + + "schema from the underlying data files and write it back to the table properties), " + + "INFER_ONLY (infer the schema but don't attempt to write it to the table properties) and " + + "NEVER_INFER (fallback to using the case-insensitive metastore schema instead of inferring).") +.stringConf +.transform(_.toUpperCase()) +.checkValues(Set("INFER_AND_SAVE", "INFER_ONLY", "NEVER_INFER")) +.createWithDefault("INFER_AND_SAVE") --- End diff -- I'll update the code to catch and log any nonfatal exception when performing the ```alterTable()``` to save the table schema when ```INFER_AND_SAVE``` is enabled. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16944: [SPARK-19611][SQL] Introduce configurable table s...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/16944#discussion_r101461535 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala --- @@ -161,23 +161,49 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log bucketSpec, Some(partitionSchema)) +val catalogTable = metastoreRelation.catalogTable val logicalRelation = cached.getOrElse { val sizeInBytes = metastoreRelation.stats(sparkSession.sessionState.conf).sizeInBytes.toLong val fileIndex = { -val index = new CatalogFileIndex( - sparkSession, metastoreRelation.catalogTable, sizeInBytes) +val index = new CatalogFileIndex(sparkSession, catalogTable, sizeInBytes) if (lazyPruningEnabled) { index } else { index.filterPartitions(Nil) // materialize all the partitions in memory } } val partitionSchemaColumnNames = partitionSchema.map(_.name.toLowerCase).toSet - val dataSchema = -StructType(metastoreSchema + val filteredMetastoreSchema = StructType(metastoreSchema .filterNot(field => partitionSchemaColumnNames.contains(field.name.toLowerCase))) + val inferenceMode = sparkSession.sessionState.conf.schemaInferenceMode + val dataSchema = if (inferenceMode != "NEVER_INFER" && + !catalogTable.schemaFromTableProps) { +val fileStatuses = fileIndex.listFiles(Nil).flatMap(_.files) +val inferred = defaultSource.inferSchema(sparkSession, options, fileStatuses) --- End diff -- I'll add an info log here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16944: [SPARK-19611][SQL] Introduce configurable table s...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/16944#discussion_r101461357 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala --- @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import java.io.File +import java.util.concurrent.{Executors, TimeUnit} + +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.metrics.source.HiveCatalogMetrics +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog._ +import org.apache.spark.sql.execution.datasources.FileStatusCache +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.hive.client.HiveClient +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.types._ + +class HiveSchemaInferenceSuite + extends QueryTest with TestHiveSingleton with SQLTestUtils with BeforeAndAfterEach { + + import HiveSchemaInferenceSuite._ + + // Create a CatalogTable instance modeling an external Hive table in a metastore that isn't + // controlled by Spark (i.e. has no Spark-specific table properties set). + private def hiveExternalCatalogTable( + tableName: String, + location: String, + schema: StructType, + partitionColumns: Seq[String], + properties: Map[String, String] = Map.empty): CatalogTable = { +CatalogTable( + identifier = TableIdentifier(table = tableName, database = Option("default")), + tableType = CatalogTableType.EXTERNAL, + storage = CatalogStorageFormat( +locationUri = Option(location), +inputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"), +outputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"), +serde = Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"), +compressed = false, +properties = Map("serialization.format" -> "1")), + schema = schema, + provider = Option("hive"), + partitionColumnNames = partitionColumns, + properties = properties) + } + + // Creates CatalogTablePartition instances for adding partitions of data to our test table. + private def hiveCatalogPartition(location: String, index: Int): CatalogTablePartition += CatalogTablePartition( + spec = Map("partcol1" -> index.toString, "partcol2" -> index.toString), + storage = CatalogStorageFormat( +locationUri = Option(s"${location}/partCol1=$index/partCol2=$index/"), +inputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"), +outputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"), +serde = Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"), +compressed = false, +properties = Map("serialization.format" -> "1"))) + + // Creates a case-sensitive external Hive table for testing schema inference options. Table + // will not have Spark-specific table properties set. + private def setupCaseSensitiveTable( + tableName: String, + dir: File): Unit = { +spark.range(NUM_RECORDS) + .selectExpr("id as fieldOne", "id as partCol1", "id as partCol2") + .write + .partitionBy("partCol1", "partCol2") + .mode("overwrite") + .parquet(dir.getAbsolutePath) + +val lowercaseSchema = StructType(Seq( + StructField("fieldone&
[GitHub] spark pull request #16944: [SPARK-19611][SQL] Introduce configurable table s...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/16944#discussion_r101461155 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala --- @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import java.io.File +import java.util.concurrent.{Executors, TimeUnit} + +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.metrics.source.HiveCatalogMetrics +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog._ +import org.apache.spark.sql.execution.datasources.FileStatusCache +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.hive.client.HiveClient +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.types._ + +class HiveSchemaInferenceSuite + extends QueryTest with TestHiveSingleton with SQLTestUtils with BeforeAndAfterEach { + + import HiveSchemaInferenceSuite._ + + // Create a CatalogTable instance modeling an external Hive table in a metastore that isn't + // controlled by Spark (i.e. has no Spark-specific table properties set). --- End diff -- I wrote the method to take arbitrary properties but for the purposes of this test only an empty map is supplied. I'll make the comment more applicable to the method though and describe the usage of it elsewhere. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16944: [SPARK-19611][SQL] Introduce configurable table s...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/16944#discussion_r101460842 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala --- @@ -161,23 +161,49 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log bucketSpec, Some(partitionSchema)) +val catalogTable = metastoreRelation.catalogTable val logicalRelation = cached.getOrElse { val sizeInBytes = metastoreRelation.stats(sparkSession.sessionState.conf).sizeInBytes.toLong val fileIndex = { -val index = new CatalogFileIndex( - sparkSession, metastoreRelation.catalogTable, sizeInBytes) +val index = new CatalogFileIndex(sparkSession, catalogTable, sizeInBytes) if (lazyPruningEnabled) { index } else { index.filterPartitions(Nil) // materialize all the partitions in memory } } val partitionSchemaColumnNames = partitionSchema.map(_.name.toLowerCase).toSet - val dataSchema = -StructType(metastoreSchema + val filteredMetastoreSchema = StructType(metastoreSchema .filterNot(field => partitionSchemaColumnNames.contains(field.name.toLowerCase))) --- End diff -- I'll fix both of these --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16944: [SPARK-19611][SQL] Introduce configurable table s...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/16944#discussion_r101460565 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -296,6 +296,17 @@ object SQLConf { .longConf .createWithDefault(250 * 1024 * 1024) + val HIVE_SCHEMA_INFERENCE_MODE = buildConf("spark.sql.hive.schemaInferenceMode") +.doc("Configures the action to take when a case-sensitive schema cannot be read from a Hive " + + "table's properties. Valid options include INFER_AND_SAVE (infer the case-sensitive " + + "schema from the underlying data files and write it back to the table properties), " + + "INFER_ONLY (infer the schema but don't attempt to write it to the table properties) and " + + "NEVER_INFER (fallback to using the case-insensitive metastore schema instead of inferring).") +.stringConf +.transform(_.toUpperCase()) +.checkValues(Set("INFER_AND_SAVE", "INFER_ONLY", "NEVER_INFER")) +.createWithDefault("INFER_AND_SAVE") --- End diff -- This was proposed in #16797 but I'd like to open this for discussion. - ```INFER_ONLY``` would mimic the pre-2.1.0 behavior. - ```INFER_AND_SAVE``` would attempt to prevent future inferences but may fail if the Hive client doesn't have write permissions on the metastore. - ```NEVER_INFER``` is the current behavior in 2.1.0 which breaks support with the tables affected by [SPARK-19611](https://issues.apache.org/jira/browse/SPARK-19611). Users may wish to enable this mode for tables without the table properties schema that they know are case-insensitive. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16944: [SPARK-19611][SQL] Introduce configurable table schema i...
Github user budde commented on the issue: https://github.com/apache/spark/pull/16944 Looks like I missed a Catalyst test. Updating the PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16942: [SPARK-19611][SQL] Introduce configurable table schema i...
Github user budde commented on the issue: https://github.com/apache/spark/pull/16942 @mallman If I did close it then it was by mistake. The "Reopen and comment" button was disabled with a message about the PR being closed by a force push when I hovered over it. Afraid I'm a bit of a n00b on GitHub PRs :/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16944: [SPARK-19611][SQL] Introduce configurable table schema i...
Github user budde commented on the issue: https://github.com/apache/spark/pull/16944 Re-pinging participants from #16797: @gatorsmile, @viirya, @ericl, @mallman and @cloud-fan. Sorry for the noise. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16944: [SPARK-19611][SQL] Introduce configurable table s...
GitHub user budde opened a pull request: https://github.com/apache/spark/pull/16944 [SPARK-19611][SQL] Introduce configurable table schema inference *Update: Accidentally broke #16942 via a force push. Opening a replacement PR.* Replaces #16797. See the discussion in this PR for more details/justification for this change. ## Summary of changes [JIRA for this change](https://issues.apache.org/jira/browse/SPARK-19611) - Add spark.sql.hive.schemaInferenceMode param to SQLConf - Add schemaFromTableProps field to CatalogTable (set to true when schema is successfully read from table props) - Perform schema inference in HiveMetastoreCatalog if schemaFromTableProps is false, depending on spark.sql.hive.schemaInferenceMode. - Update table metadata properties in HiveExternalCatalog.alterTable() - Add HiveSchemaInferenceSuite tests ## How was this patch tested? The tests in HiveSchemaInferenceSuite should verify that schema inference is working as expected. ## Open issues - The option values for ```spark.sql.hive.schemaInferenceMode ``` (e.g. "INFER_AND_SAVE", "INFER_ONLY", "NEVER_INFER") should be made into constants or an enum. I couldn't find a sensible object to place them in though that doesn't introduce a dependency between sql/core and sql/hive. - Should "INFER_AND_SAVE" be the default behavior? This restores the out-of-the-box compatibility that was present prior to 2.1.0 but changes the behavior of 2.1.0 (which is essentially "NEVER_INFER"). - Is ```HiveExternalCatalog.alterTable()``` the appropriate place to write back the table metadata properties outside of createTable()? Should a new external catalog method like updateTableMetadata() be introduced? - All partition columns will still be treated as case-insensitive even after inferring. As far as I remember, this has always been the case with schema inference prior to Spark 2.1.0 and I haven't made any attempts to reconcile this since it doesn't cause the same problems that case sensitive data fields do. Should we attempt to restore case sensitivity by inspecting file paths or leave this as-is? You can merge this pull request into a Git repository by running: $ git pull https://github.com/budde/spark SPARK-19611 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16944.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16944 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16942: [SPARK-19611][SQL] Introduce configurable table schema i...
Github user budde commented on the issue: https://github.com/apache/spark/pull/16942 Accidentally did a force-push to my branch for this issue. Looks like I'll have to open a new PR... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16942: [SPARK-19611][SQL] Introduce configurable table s...
Github user budde closed the pull request at: https://github.com/apache/spark/pull/16942 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16942: [SPARK-19611][SQL] Introduce configurable table schema i...
Github user budde commented on the issue: https://github.com/apache/spark/pull/16942 Tests appear to be failing due to the following error: ``` [info] Exception encountered when attempting to run a suite with class name: org.apache.spark.sql.streaming.FileStreamSourceSuite *** ABORTED *** (0 milliseconds) [info] org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, set spark.driver.allowMultipleContexts = true. The currently running SparkContext was created at: org.apache.spark.sql.execution.SQLExecutionSuite$$anonfun$3.apply(SQLExecutionSuite.scala:107) ... ``` I don't think anything in this PR should've changed the behavior of core SQL tests, but I'll look in to this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16942: [SPARK-19611][SQL] Introduce configurable table s...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/16942#discussion_r101366583 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -296,6 +296,17 @@ object SQLConf { .longConf .createWithDefault(250 * 1024 * 1024) + val HIVE_SCHEMA_INFERENCE_MODE = buildConf("spark.sql.hive.schemaInferenceMode") +.doc("Configures the action to take when a case-sensitive schema cannot be read from a Hive " + + "table's properties. Valid options include INFER_AND_SAVE (infer the case-sensitive " + + "schema from the underlying data files and write it back to the table properties), " + + "INFER_ONLY (infer the schema but don't attempt to write it to the table properties) and " + + "NEVER_INFER (fallback to using the case-insensitive metastore schema instead of inferring).") +.stringConf +.transform(_.toUpperCase()) +.checkValues(Set("INFER_AND_SAVE", "INFER_ONLY", "NEVER_INFER")) +.createWithDefault("INFER_AND_SAVE") --- End diff -- I'm open for discussion on whether or not this should be the default behavior. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16942: [SPARK-19611][SQL] Introduce configurable table s...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/16942#discussion_r101366441 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -296,6 +296,17 @@ object SQLConf { .longConf .createWithDefault(250 * 1024 * 1024) + val HIVE_SCHEMA_INFERENCE_MODE = buildConf("spark.sql.hive.schemaInferenceMode") +.doc("Configures the action to take when a case-sensitive schema cannot be read from a Hive " + + "table's properties. Valid options include INFER_AND_SAVE (infer the case-sensitive " + + "schema from the underlying data files and write it back to the table properties), " + + "INFER_ONLY (infer the schema but don't attempt to write it to the table properties) and " + + "NEVER_INFER (fallback to using the case-insensitive metastore schema instead of inferring).") +.stringConf +.transform(_.toUpperCase()) +.checkValues(Set("INFER_AND_SAVE", "INFER_ONLY", "NEVER_INFER")) --- End diff -- As mentioned in the PR, I'm looking for a good place to store these values as constants or an enum. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16942: [SPARK-19611][SQL] Introduce configurable table s...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/16942#discussion_r101366307 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala --- @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import java.io.File +import java.util.concurrent.{Executors, TimeUnit} + +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.metrics.source.HiveCatalogMetrics +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog._ +import org.apache.spark.sql.execution.datasources.FileStatusCache +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.hive.client.HiveClient +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.types._ + +class HiveSchemaInferenceSuite + extends QueryTest with TestHiveSingleton with SQLTestUtils with BeforeAndAfterEach { + + import HiveSchemaInferenceSuite._ + + // Create a CatalogTable instance modeling an external Hive table in a metastore that isn't + // controlled by Spark (i.e. has no Spark-specific table properties set). + private def hiveExternalCatalogTable( + tableName: String, + location: String, + schema: StructType, + partitionColumns: Seq[String], + properties: Map[String, String] = Map.empty): CatalogTable = { +CatalogTable( + identifier = TableIdentifier(table = tableName, database = Option("default")), + tableType = CatalogTableType.EXTERNAL, + storage = CatalogStorageFormat( +locationUri = Option(location), +inputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"), +outputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"), +serde = Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"), +compressed = false, +properties = Map("serialization.format" -> "1")), + schema = schema, + provider = Option("hive"), + partitionColumnNames = partitionColumns, + properties = properties) + } + + // Creates CatalogTablePartition instances for adding partitions of data to our test table. + private def hiveCatalogPartition(location: String, index: Int): CatalogTablePartition += CatalogTablePartition( + spec = Map("partcol1" -> index.toString, "partcol2" -> index.toString), + storage = CatalogStorageFormat( +locationUri = Option(s"${location}/partCol1=$index/partCol2=$index/"), +inputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"), +outputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"), +serde = Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"), +compressed = false, +properties = Map("serialization.format" -> "1"))) + + // Creates a case-sensitive external Hive table for testing schema inference options. Table + // will not have Spark-specific table properties set. + private def setupCaseSensitiveTable( + tableName: String, + dir: File): Unit = { +spark.range(NUM_RECORDS) + .selectExpr("id as fieldOne", "id as partCol1", "id as partCol2") + .write + .partitionBy("partCol1", "partCol2") + .mode("overwrite") + .parquet(dir.getAbsolutePath) + +val lowercaseSchema = StructType(Seq( + StructField("fieldone&
[GitHub] spark issue #16942: [SPARK-19611][SQL] Introduce configurable table schema i...
Github user budde commented on the issue: https://github.com/apache/spark/pull/16942 Pinging participants from #16797: @gatorsmile, @viirya, @ericl, @mallman and @cloud-fan --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16942: [SPARK-19611][SQL] Introduce configurable table s...
GitHub user budde opened a pull request: https://github.com/apache/spark/pull/16942 [SPARK-19611][SQL] Introduce configurable table schema inference Replaces #16797. See the discussion in this PR for more details/justification for this change. ## Summary of changes [JIRA for this change](https://issues.apache.org/jira/browse/SPARK-19611) - Add spark.sql.hive.schemaInferenceMode param to SQLConf - Add schemaFromTableProps field to CatalogTable (set to true when schema is successfully read from table props) - Perform schema inference in HiveMetastoreCatalog if schemaFromTableProps is false, depending on spark.sql.hive.schemaInferenceMode. - Update table metadata properties in HiveExternalCatalog.alterTable() - Add HiveSchemaInferenceSuite tests ## How was this patch tested? The tests in HiveSchemaInferenceSuite should verify that schema inference is working as expected. ## Open issues - The option values for ```spark.sql.hive.schemaInferenceMode ``` (e.g. "INFER_AND_SAVE", "INFER_ONLY", "NEVER_INFER") should be made into constants or an enum. I couldn't find a sensible object to place them in though that doesn't introduce a dependency between sql/core and sql/hive. - Should "INFER_AND_SAVE" be the default behavior? This restores the out-of-the-box compatibility that was present prior to 2.1.0 but changes the behavior of 2.1.0 (which is essentially "NEVER_INFER"). - Is ```HiveExternalCatalog.alterTable()``` the appropriate place to write back the table metadata properties outside of createTable()? Should a new external catalog method like updateTableMetadata() be introduced? - All partition columns will still be treated as case-insensitive even after inferring. As far as I remember, this has always been the case with schema inference prior to Spark 2.1.0 and I haven't made any attempts to reconcile this since it doesn't cause the same problems that case sensitive data fields do. Should we attempt to restore case sensitivity by inspecting file paths or leave this as-is? You can merge this pull request into a Git repository by running: $ git pull https://github.com/budde/spark SPARK-19611 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16942.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16942 commit ced9c4d8363fb4e10e027da5a793ceabed11cfb7 Author: Budde <bu...@amazon.com> Date: 2017-02-08T04:46:48Z [SPARK-19611][SQL] Introduce configurable table schema inference - Add spark.sql.hive.schemaInferenceMode param to SQLConf - Add schemaFromTableProps field to CatalogTable (set to true when schema is successfully read from table props) - Perform schema inference in HiveMetastoreCatalog if schemaFromTableProps is false, depending on spark.sql.hive.schemaInferenceMode. - Update table metadata properties in HiveExternalCatalog.alterTable() - Add HiveSchemaInferenceSuite tests --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16797: [SPARK-19455][SQL] Add option for case-insensitive Parqu...
Github user budde commented on the issue: https://github.com/apache/spark/pull/16797 Thanks for all the feedback on this PR, folks. I'm going to close this PR/JIRA and open new ones for enabling configurable schema inference as a fallback. I'll ping each of you who has been active in this discussion on the new PR once it is open. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16797: [SPARK-19455][SQL] Add option for case-insensitiv...
Github user budde closed the pull request at: https://github.com/apache/spark/pull/16797 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16744: [SPARK-19405][STREAMING] Support for cross-account Kines...
Github user budde commented on the issue: https://github.com/apache/spark/pull/16744 @brkyvz Any thoughts on moving the dependency version bump to a new commit and backporting to 2.11 with the pervious versions? @zswing Any chance you could take a look at this sometime this week? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16744: [SPARK-19405][STREAMING] Support for cross-account Kines...
Github user budde commented on the issue: https://github.com/apache/spark/pull/16744 @brkyvz Would it be possible to backport to 2.1.1 if I reverted to the old version of the KCL and made the dependency upgrade as a separate PR? We'd still be adding ```aws-java-sdk-sts``` as a dependency but its version would be consistent with the other AWS Java SDK artifacts that are getting pulled in as part of the Kinesis Client Library dependencies. I think moving to the builder pattern makes sense as a separate PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16744: [SPARK-19405][STREAMING] Support for cross-account Kines...
Github user budde commented on the issue: https://github.com/apache/spark/pull/16744 @brkyvz Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16797: [SPARK-19455][SQL] Add option for case-insensitive Parqu...
Github user budde commented on the issue: https://github.com/apache/spark/pull/16797 @mallman The Parquet schema merging methods take me back to #5214 :) I haven't been following changes here very closely but I would guess use of this method was replaced to the previously-used call to ```ParquetFileFormat.inferSchema()```. But I think it is important to point out that this functionality was explicitly added to support case sensitivity differences. In regard to the JIRA, I'll either modify it or replace it with a new one for bringing back (configurable) inference. I can mark it a "bug" at that point. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16744: [SPARK-19405][STREAMING] Support for cross-account Kines...
Github user budde commented on the issue: https://github.com/apache/spark/pull/16744 Pinging @brkyvz and @srowen once more for a final look and to get Jenkins to retest the latest update (not sure if this still requires Jenkins admin rights). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16797: [SPARK-19455][SQL] Add option for case-insensitive Parqu...
Github user budde commented on the issue: https://github.com/apache/spark/pull/16797 @cloud-fan: > Spark does support mixed-case-schema tables, and it has always been. It's because we write table schema to metastore case-preserving, via table properties. Spark prior to 2.1 supported *any* case-sensitive table, regardless of what table properties are set. Spark 2.1 supports these tables if and only if Spark 2.1 was used to create them and embedded the schema as a metadata property. > So the data file schema must match the table schema, or Spark will fail, it has always been. This absolutely not how it's always been. Spark would infer the schema from the source files and use that schema when constructing a logical relation. We've been relying on this behavior for years. > For any table, as long as hive can read it, Spark can read it. I've double checked this and Hive can query against tables backed by case-sensitive Parquet files. Spark 2.1 is currently the only Hive-compatible query engine I'm familiar with that won't support this usecase. > But we supported it, with the cost of runtime schema inference. My argument is that is should be possible to fall back to this level of support if the properties aren't present. > This problem was solved in Spark 2.1, by writing table schema to metastore case-preserving for hive serde tables. Now we can say that, the data schema must match the table schema, or Spark should fail. Spark does not explicitly fail in this case. It falls back to the downcased metastore schema, which will silently fail and return 0 results if a case-sensitive field name is used in your projection or filter predicate. > That's why I prefer the migration command approach, it keeps the concept clean: data schema must match table schema. This links Spark upgrades to potentially-costly data migrations. From an end-user perspective, prior to 2.1 you could simply point Spark SQL to an external Hive metastore and query any data in it. Now you have to make sure the table has been migrated to the appropriate version of Spark or your queries may silently return incorrect results. The migration approach also assumes that Spark has write access to the Hive metastore it is querying against. If you have read-only access to a metastore administered by another team or organization you are at their mercy to run migrations on your behalf against the latest version of Spark in order to allow you to query their tables from Spark. I think anybody who's ever found themselves in a similar situation can attest that it's never good to be beholden to someone else to enable a feature that only matters to you. And again, in some cases migrating all tables in a large Hive warehouse could be an extremely expensive operation that potentially touches petabytes of data. > Like you said, users can still create a hive table with mixed-case-schema parquet/orc files, by hive or other systems like presto. This table is readable for hive, and for Spark prior to 2.1, because of the runtime schema inference But this is not intentional, and Spark should not support it as the data file schema and table schema mismatch. We can make the migration command cover this case too. I will continue to argue strongly against reducing the number of usecases Spark SQL supports out of the box. While offering a migration command can offer a helpful optimization I don't think it is acceptable as the only option for the reasons I've detailed here. Simply put, I think relying on the presence of Spark-specific key/value pairs in the table properties in order for Spark SQL to function properly and assuming that Spark (or Spark users) can easily alter those properties to add the table schema is too brittle for large-scale production use. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16797: [SPARK-19455][SQL] Add option for case-insensitive Parqu...
Github user budde commented on the issue: https://github.com/apache/spark/pull/16797 > For better user experience, we should automatically infer the schema and write it back to metastore, if there is no case-sensitive table schema in metastore. This has the cost of detection the need of schema inference, and complicating table read code path. Totally agree. I think the default behavior should be to infer and backfill a case-sensitive schema into the table properties if one isn't already there. An option should also be provided to disable all inference and just fall back to the case-insensitive metastore schema if none is found (i.e. the current behavior in 2.1.0). > If this is only a compatibility issue, I think it's fair to ask the cluster maintainers to run some commands after upgrade Spark cluster. Even there are a lot of tables, it's easy to write a script to automate it. I don't think this is fair. For one, as I've mentioned, in some cases Spark may not be the tool being used to maintain the metastore. This will now require the warehouse admins to set up a Spark cluster and run these migration commands on every table with case-sensitive underlying data if they'd like them to be accessible from Spark. As a second point, while writing an automation script may be trivial the execution costs aren't, especially if the data is stored in a format like JSON where each and every record in the table must be read in order to infer the schema. > If there is no Spark specific table properties, we assume this table is created by hive(not by external systems like Presto), so the schema of parquet files should be all lowercased. This isn't an assumption made by Spark prior to 2.1.0, whether this was an explicit decision or not. All I'm asking for is a way to configure Spark to continue supporting a use case it has for years. Also, in our case, the table was created by Spark, not Presto. Presto is just an example of another execution engine we've put in front of our warehouse that hasn't had a problem with the underlying Parquet data being case-sensitive. We just used an older version of Spark to create the tables. I would think long and hard about whether requiring warehouse admins to run potentially-costly migrations between Spark versions to update table metadata is a preferable option to offering a way for being backwards-compatible with the old behavior. Again, I think introducing a mechanism to migrate the table properties is a good idea. I just don't think it should be the only option. > Another proposal is to make parquet reader case-insensitive, so that we can solve this problem without schema inference. But the problem is, Spark can be configured to be case-sensitive, so that it's possible to write such a schema (conflicting columns after lower-casing) into metastore. I think this proposal is the best if we can totally make Spark case-insensitive. I don't think this would be a bad option if this could be enabled at the Parquet level, but it seems as their work towards enabling case-insensitive file access has stalled. As @ericl pointed out above, moving this to the ParquetReadSupport level may make the situation better for Parquet but the behavior won't be consistent across file formats like ORC or JSON. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16744: [SPARK-19405][STREAMING] Support for cross-account Kines...
Github user budde commented on the issue: https://github.com/apache/spark/pull/16744 Looks like Jenkins is failing to build any recent PR due to the following error: ```[error] Could not find hadoop2.3 in the list. Valid options are ['hadoop2.6', 'hadoop2.7']``` I would guess this is related to [this commit](https://github.com/apache/spark/commit/e8d3fca4502d5f5b8f38525b5fdabe80ccf9a8ec). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16744: [SPARK-19405][STREAMING] Support for cross-account Kines...
Github user budde commented on the issue: https://github.com/apache/spark/pull/16744 Amending the PR again to fix new dependency conflict in spark/pom.xml. Thanks again for taking the time to review this, @brkyvz and @srowen. Please let me know if you feel any additional changes are needed before this is ready to merge. Since this doesn't break any existing APIs I think it would make some people happy if we could get this in the 2.1.1 release. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16797: [SPARK-19455][SQL] Add option for case-insensitive Parqu...
Github user budde commented on the issue: https://github.com/apache/spark/pull/16797 > is it a completely compatibility issue? Seems like the only problem is, when we write out mixed-case-schema parquet files directly, and create an external table pointing to these files with Spark prior to 2.1, then read this table with Spark 2.1+. Fundamentally, I wouldn't make the assumption that Spark is being used to create and maintain the tables in the Hive Metastore that Spark is querying against. We're currently using Spark to add and update metastore tables in our usecase, but I don't think Spark should make any assumptions about how the table was created with or what properties may be set. In regard to the underlying issue, we've been using Spark in production for over two years and have several petabytes of case-sensitive Parquet data we've both written and queried using Spark. As of Spark 2.1, we are no longer able to use Spark to query any of this data as any query containing a case-sensitive field name will return 0 results. I would argue this is a compatibility regression. > For tables in hive, as long as long hive can read it, Spark should be able to read it too. In our case, other Hive-compatible query engines like Presto don't have a problem with case-sensitive Parquet files. I haven't tried Hive itself in a long time but as far as I remember we didn't have a problem there either. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16744: [SPARK-19405][STREAMING] Support for cross-account Kines...
Github user budde commented on the issue: https://github.com/apache/spark/pull/16744 Amending PR per review feedback. Issue around using optional stsExternalId argument in ```KinesisUtils.createStream()``` remains open. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16744: [SPARK-19405][STREAMING] Support for cross-accoun...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/16744#discussion_r99909144 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala --- @@ -123,9 +123,143 @@ object KinesisUtils { // scalastyle:on val cleanedHandler = ssc.sc.clean(messageHandler) ssc.withNamedScope("kinesis stream") { + val kinesisCredsProvider = BasicCredentialsProvider( +awsAccessKeyId = awsAccessKeyId, +awsSecretKey = awsSecretKey) new KinesisInputDStream[T](ssc, streamName, endpointUrl, validateRegion(regionName), initialPositionInStream, kinesisAppName, checkpointInterval, storageLevel, -cleanedHandler, Some(SerializableAWSCredentials(awsAccessKeyId, awsSecretKey))) +cleanedHandler, kinesisCredsProvider) +} + } + + /** + * Create an input stream that pulls messages from a Kinesis stream. + * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis. + * + * @param ssc StreamingContext object + * @param kinesisAppName Kinesis application name used by the Kinesis Client Library + *(KCL) to update DynamoDB + * @param streamName Kinesis stream name + * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com) + * @param regionName Name of region used by the Kinesis Client Library (KCL) to update + * DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics) + * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the + * worker's initial starting position in the stream. + * The values are either the beginning of the stream + * per Kinesis' limit of 24 hours + * (InitialPositionInStream.TRIM_HORIZON) or + * the tip of the stream (InitialPositionInStream.LATEST). + * @param checkpointInterval Checkpoint interval for Kinesis checkpointing. + *See the Kinesis Spark Streaming documentation for more + *details on the different types of checkpoints. + * @param storageLevel Storage level to use for storing the received objects. + * StorageLevel.MEMORY_AND_DISK_2 is recommended. + * @param messageHandler A custom message handler that can generate a generic output from a + * Kinesis `Record`, which contains both message data, and metadata. + * @param stsAssumeRoleArn ARN of IAM role to assume when using STS sessions to read from + * Kinesis stream. + * @param stsSessionName Name to uniquely identify STS sessions if multiple princples assume + * the same role. + * @param stsExternalId External ID that can be used to validate against the assumed IAM role's + * trust policy. + * + * @note The AWS credentials will be discovered using the DefaultAWSCredentialsProviderChain + * on the workers. See AWS documentation to understand how DefaultAWSCredentialsProviderChain + * gets the AWS credentials. + */ + // scalastyle:off + def createStream[T: ClassTag]( + ssc: StreamingContext, + kinesisAppName: String, + streamName: String, + endpointUrl: String, + regionName: String, + initialPositionInStream: InitialPositionInStream, + checkpointInterval: Duration, + storageLevel: StorageLevel, + messageHandler: Record => T, + stsAssumeRoleArn: String, + stsSessionName: String, + stsExternalId: String): ReceiverInputDStream[T] = { --- End diff -- To expand on this a little bit, using a builder pattern will also make it feasible to specificy a different SerializableCredentialsProvider for each of the three AWS services the KCL uses (Kinesis, DynamoDB and CloudWatch). This really won't be feasible under the current approach of extending the single ```KinesisUtils.createStream()``` API. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16744: [SPARK-19405][STREAMING] Support for cross-accoun...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/16744#discussion_r99908239 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala --- @@ -34,11 +35,56 @@ import org.apache.spark.streaming.Duration import org.apache.spark.streaming.receiver.{BlockGenerator, BlockGeneratorListener, Receiver} import org.apache.spark.util.Utils -private[kinesis] -case class SerializableAWSCredentials(accessKeyId: String, secretKey: String) - extends AWSCredentials { - override def getAWSAccessKeyId: String = accessKeyId - override def getAWSSecretKey: String = secretKey +/** + * Serializable interface providing a method executors can call to obtain an + * AWSCredentialsProvider instance for authenticating to AWS services. + */ +private[kinesis] trait SerializableCredentialsProvider extends Serializable { + /** + * Return an AWSCredentialProvider instance that can be used by the Kinesis Client + * Library to authenticate to AWS services (Kinesis, CloudWatch and DynamoDB). + */ + def provider: AWSCredentialsProvider +} + +/** Returns DefaultAWSCredentialsProviderChain for authentication. */ +private[kinesis] case object DefaultCredentialsProvider + extends SerializableCredentialsProvider { + + def provider: AWSCredentialsProvider = new DefaultAWSCredentialsProviderChain +} + +/** Returns AWSStaticCredentialsProvider constructed using basic AWS keypair. */ +private[kinesis] case class BasicCredentialsProvider( +awsAccessKeyId: String, +awsSecretKey: String) extends SerializableCredentialsProvider { + + def provider: AWSCredentialsProvider += new AWSStaticCredentialsProvider(new BasicAWSCredentials(awsAccessKeyId, awsSecretKey)) --- End diff -- Will do --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16744: [SPARK-19405][STREAMING] Support for cross-accoun...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/16744#discussion_r99908125 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala --- @@ -449,22 +935,48 @@ private class KinesisUtilsPythonHelper { checkpointInterval: Duration, storageLevel: StorageLevel, awsAccessKeyId: String, - awsSecretKey: String - ): JavaReceiverInputDStream[Array[Byte]] = { + awsSecretKey: String, + stsAssumeRoleArn: String, + stsSessionName: String, + stsExternalId: String): JavaReceiverInputDStream[Array[Byte]] = { +// scalastyle:on +if (!(stsAssumeRoleArn != null && stsSessionName != null && stsExternalId != null) +&& !(stsAssumeRoleArn == null && stsSessionName == null && stsExternalId == null)) { + throw new IllegalArgumentException("stsAssumeRoleArn, stsSessionName, and stsExtenalId " + +"must all be defined or all be null") +} + +if (stsAssumeRoleArn != null && stsSessionName != null && stsExternalId != null) { + validateAwsCreds(awsAccessKeyId, awsSecretKey) + if (awsAccessKeyId == null && awsSecretKey == null) { +KinesisUtils.createStream(jssc, kinesisAppName, streamName, endpointUrl, regionName, --- End diff -- This is actually in the ```KinesisUtilsPythonHelper``` object so we'll either need to qualify ```KinesisUtils``` or do an ```import KinesisUtils._``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16744: [SPARK-19405][STREAMING] Support for cross-accoun...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/16744#discussion_r99907831 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala --- @@ -449,22 +935,48 @@ private class KinesisUtilsPythonHelper { checkpointInterval: Duration, storageLevel: StorageLevel, awsAccessKeyId: String, - awsSecretKey: String - ): JavaReceiverInputDStream[Array[Byte]] = { + awsSecretKey: String, + stsAssumeRoleArn: String, + stsSessionName: String, + stsExternalId: String): JavaReceiverInputDStream[Array[Byte]] = { +// scalastyle:on +if (!(stsAssumeRoleArn != null && stsSessionName != null && stsExternalId != null) +&& !(stsAssumeRoleArn == null && stsSessionName == null && stsExternalId == null)) { + throw new IllegalArgumentException("stsAssumeRoleArn, stsSessionName, and stsExtenalId " + --- End diff -- See previous comment in KinesisUtils. Obviously it's easier to deal with optional arguments for the Python API but I'd favor keeping this consistent with the definitions of ```createStream()``` for Java/Scala --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16744: [SPARK-19405][STREAMING] Support for cross-accoun...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/16744#discussion_r99906733 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala --- @@ -123,9 +123,143 @@ object KinesisUtils { // scalastyle:on val cleanedHandler = ssc.sc.clean(messageHandler) ssc.withNamedScope("kinesis stream") { + val kinesisCredsProvider = BasicCredentialsProvider( +awsAccessKeyId = awsAccessKeyId, +awsSecretKey = awsSecretKey) new KinesisInputDStream[T](ssc, streamName, endpointUrl, validateRegion(regionName), initialPositionInStream, kinesisAppName, checkpointInterval, storageLevel, -cleanedHandler, Some(SerializableAWSCredentials(awsAccessKeyId, awsSecretKey))) +cleanedHandler, kinesisCredsProvider) +} + } + + /** + * Create an input stream that pulls messages from a Kinesis stream. + * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis. + * + * @param ssc StreamingContext object + * @param kinesisAppName Kinesis application name used by the Kinesis Client Library + *(KCL) to update DynamoDB + * @param streamName Kinesis stream name + * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com) + * @param regionName Name of region used by the Kinesis Client Library (KCL) to update + * DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics) + * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the + * worker's initial starting position in the stream. + * The values are either the beginning of the stream + * per Kinesis' limit of 24 hours + * (InitialPositionInStream.TRIM_HORIZON) or + * the tip of the stream (InitialPositionInStream.LATEST). + * @param checkpointInterval Checkpoint interval for Kinesis checkpointing. + *See the Kinesis Spark Streaming documentation for more + *details on the different types of checkpoints. + * @param storageLevel Storage level to use for storing the received objects. + * StorageLevel.MEMORY_AND_DISK_2 is recommended. + * @param messageHandler A custom message handler that can generate a generic output from a + * Kinesis `Record`, which contains both message data, and metadata. + * @param stsAssumeRoleArn ARN of IAM role to assume when using STS sessions to read from + * Kinesis stream. + * @param stsSessionName Name to uniquely identify STS sessions if multiple princples assume + * the same role. + * @param stsExternalId External ID that can be used to validate against the assumed IAM role's + * trust policy. + * + * @note The AWS credentials will be discovered using the DefaultAWSCredentialsProviderChain + * on the workers. See AWS documentation to understand how DefaultAWSCredentialsProviderChain + * gets the AWS credentials. + */ + // scalastyle:off + def createStream[T: ClassTag]( + ssc: StreamingContext, + kinesisAppName: String, + streamName: String, + endpointUrl: String, + regionName: String, + initialPositionInStream: InitialPositionInStream, + checkpointInterval: Duration, + storageLevel: StorageLevel, + messageHandler: Record => T, + stsAssumeRoleArn: String, + stsSessionName: String, + stsExternalId: String): ReceiverInputDStream[T] = { --- End diff -- The external ID is optional but I'm making it required in ```KinesisUtils``` since otherwise we'll need to double the number of overrides of ```createStream()``` (e.g. for STS params with/without stsExternalId rather than just STS prams with stsExternalId). I think the API was constructed in this fashion in order to have consistent method declarations between Scala and Java. I think the better long term solution here is to deprecate ```createStream()``` in favor of a builder class for constructing Kinesis DStreams. If the external ID isn't specified in the trust policy of the IAM role being assumed it will simply be ignored. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, p
[GitHub] spark pull request #16744: [SPARK-19405][STREAMING] Support for cross-accoun...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/16744#discussion_r99905835 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala --- @@ -34,11 +35,56 @@ import org.apache.spark.streaming.Duration import org.apache.spark.streaming.receiver.{BlockGenerator, BlockGeneratorListener, Receiver} import org.apache.spark.util.Utils -private[kinesis] -case class SerializableAWSCredentials(accessKeyId: String, secretKey: String) - extends AWSCredentials { - override def getAWSAccessKeyId: String = accessKeyId - override def getAWSSecretKey: String = secretKey +/** + * Serializable interface providing a method executors can call to obtain an + * AWSCredentialsProvider instance for authenticating to AWS services. + */ +private[kinesis] trait SerializableCredentialsProvider extends Serializable { --- End diff -- Will do --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16744: [SPARK-19405][STREAMING] Support for cross-accoun...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/16744#discussion_r99905600 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala --- @@ -23,7 +23,8 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.control.NonFatal -import com.amazonaws.auth.{AWSCredentials, AWSCredentialsProvider, DefaultAWSCredentialsProviderChain} +import com.amazonaws.auth.{AWSCredentials, AWSCredentialsProvider, AWSStaticCredentialsProvider, --- End diff -- Will do --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16744: [SPARK-19405][STREAMING] Support for cross-accoun...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/16744#discussion_r99905664 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala --- @@ -123,9 +123,143 @@ object KinesisUtils { // scalastyle:on val cleanedHandler = ssc.sc.clean(messageHandler) ssc.withNamedScope("kinesis stream") { + val kinesisCredsProvider = BasicCredentialsProvider( +awsAccessKeyId = awsAccessKeyId, +awsSecretKey = awsSecretKey) new KinesisInputDStream[T](ssc, streamName, endpointUrl, validateRegion(regionName), initialPositionInStream, kinesisAppName, checkpointInterval, storageLevel, -cleanedHandler, Some(SerializableAWSCredentials(awsAccessKeyId, awsSecretKey))) +cleanedHandler, kinesisCredsProvider) +} + } + + /** + * Create an input stream that pulls messages from a Kinesis stream. + * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis. + * + * @param ssc StreamingContext object + * @param kinesisAppName Kinesis application name used by the Kinesis Client Library + *(KCL) to update DynamoDB + * @param streamName Kinesis stream name + * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com) + * @param regionName Name of region used by the Kinesis Client Library (KCL) to update + * DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics) + * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the + * worker's initial starting position in the stream. + * The values are either the beginning of the stream + * per Kinesis' limit of 24 hours + * (InitialPositionInStream.TRIM_HORIZON) or + * the tip of the stream (InitialPositionInStream.LATEST). + * @param checkpointInterval Checkpoint interval for Kinesis checkpointing. + *See the Kinesis Spark Streaming documentation for more + *details on the different types of checkpoints. + * @param storageLevel Storage level to use for storing the received objects. + * StorageLevel.MEMORY_AND_DISK_2 is recommended. + * @param messageHandler A custom message handler that can generate a generic output from a + * Kinesis `Record`, which contains both message data, and metadata. + * @param stsAssumeRoleArn ARN of IAM role to assume when using STS sessions to read from + * Kinesis stream. + * @param stsSessionName Name to uniquely identify STS sessions if multiple princples assume + * the same role. + * @param stsExternalId External ID that can be used to validate against the assumed IAM role's + * trust policy. + * + * @note The AWS credentials will be discovered using the DefaultAWSCredentialsProviderChain + * on the workers. See AWS documentation to understand how DefaultAWSCredentialsProviderChain + * gets the AWS credentials. + */ + // scalastyle:off + def createStream[T: ClassTag]( + ssc: StreamingContext, + kinesisAppName: String, + streamName: String, + endpointUrl: String, + regionName: String, + initialPositionInStream: InitialPositionInStream, + checkpointInterval: Duration, + storageLevel: StorageLevel, + messageHandler: Record => T, + stsAssumeRoleArn: String, + stsSessionName: String, + stsExternalId: String): ReceiverInputDStream[T] = { +// scalastyle:on +val cleanedHandler = ssc.sc.clean(messageHandler) +// Setting scope to override receiver stream's scope of "receiver stream" +ssc.withNamedScope("kinesis stream") { + val kinesisCredsProvider = STSCredentialsProvider( +stsRoleArn = stsAssumeRoleArn, +stsSessionName = stsSessionName, +stsExternalId = Some(stsExternalId)) --- End diff -- Will do --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: rev
[GitHub] spark pull request #16744: [SPARK-19405][STREAMING] Support for cross-accoun...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/16744#discussion_r99905577 --- Diff: external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala --- @@ -62,9 +62,20 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft checkpointerMock = mock[IRecordProcessorCheckpointer] } - test("check serializability of SerializableAWSCredentials") { -Utils.deserialize[SerializableAWSCredentials]( - Utils.serialize(new SerializableAWSCredentials("x", "y"))) + test("check serializability of credential provider classes") { +Utils.deserialize[BasicCredentialsProvider]( + Utils.serialize(BasicCredentialsProvider( +awsAccessKeyId = "x", +awsSecretKey = "y"))) + +Utils.deserialize[STSCredentialsProvider]( + Utils.serialize(STSCredentialsProvider( +stsRoleArn = "fakeArn", +stsSessionName = "fakeSessionName", +stsExternalId = Some("fakeExternalId"), +longLivedCredsProvider = BasicCredentialsProvider( --- End diff -- I'll add another test ```longLivedCredentialsProvider.``` I ran into errors making similar ser/de test for ```DefaultCredentialsProvider``` since it's a case class. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16797: [SPARK-19455][SQL] Add option for case-insensitive Parqu...
Github user budde commented on the issue: https://github.com/apache/spark/pull/16797 > Can we write such schema (conflicting columns after lower-casing) into metastore? I think the scenario here would be that the metastore contains a single lower-case column name that could resolve to multiple case-sesnitive column names in the underlying Parquet file. This could've happened via the user manually executing a ```CREATE TABE...``` query with an explicit schema. Since the metastore itself isn't really defining expected behavior in this case I think we can just consider this undefined behavior and return the first field that matches alphabetically. I don't think this is very likely to be a legitimate usecase, but it's good to point out edge cases. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16797: [SPARK-19455][SQL] Add option for case-insensitive Parqu...
Github user budde commented on the issue: https://github.com/apache/spark/pull/16797 > BTW, what behavior do we expect if a parquet file has two columns whose lower-cased names are identical? I can take a look at how Spark handled this prior to 2.1, although I'm not sure if the behavior we'll see there was the result of a conscious decision or "undefined" behavior. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16797: [SPARK-19455][SQL] Add option for case-insensitive Parqu...
Github user budde commented on the issue: https://github.com/apache/spark/pull/16797 > how about we add a new SQL command to refresh the table schema in metastore by inferring schema with data files? This is a compatibility issue and we should have provided a way for users to migrate, before the 2.1 release. I think this approach is much simpler than adding a flag. While I think introducing a command for inferring and storing the table's case-sensitive schema as a property would be a welcome addition, I think requiring this property to be there in order for Spark SQL to function properly with case-sensitive data files could really restrict the settings Spark SQL can be used in. If a user wanted to use Spark SQL to query over an existing warehouse containing hundreds or even thousands of tables, under the suggested approach a Spark job would have to be run to infer the schema of each and every table. file formats such as Parquet store their schemas as metadata there still could potentially be millions of files to inspect for the warehouse. A less amenable format like JSON might require scanning all the data in the warehouse. This also doesn't cover the use case @viirya pointed our where the user may not have write access to the metastore they are querying against. In this case, the user would have to rely on the warehouse administrator to create the Spark schema property for every table they wish to query. > For tables created by hive, as hive is a case-insensitive system, will the parquet files have mixed-case schema? I think the Hive Metastore has become a bit of an open standard for maintaining a data warehouse catalog since so many tools integrate with it. I wouldn't assume that the underlying data pointed to by an external metastore was created or managed by Hive itself. For example, we maintain a Hive Metastore that catalogs case-sensitive files written by our Spark-based ETL pipeline, which parses case classes from string data and writes them as Parquet. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16744: [SPARK-19405][STREAMING] Support for cross-account Kines...
Github user budde commented on the issue: https://github.com/apache/spark/pull/16744 PR has been amended to reflect feedback. Thanks for taking a look, @brkyvz. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16797: [SPARK-19455][SQL] Add option for case-insensitive Parqu...
Github user budde commented on the issue: https://github.com/apache/spark/pull/16797 > Should we roll these behaviors into one flag? e.g. ```spark.sql.hive.mixedCaseSchemaSupport``` That sounds reasonable to me. The only thing I wonder about is if there's any use case where we want to infer the schema but not attempt to write it back as a property, say if the external metastore doesn't permit table property updates from the user. We can always just log the failure, but this could be noisy for users expecting this behavior by default. This could be solved by adding an INFER_WITHOUT_SAVING mode. I'll leave the PR open for now so we can hear and discuss @mallman's input but if we're all on board with this approach I'll eventually close out this out in favor of a new PR adding configurable schema inference behavior. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16744: [SPARK-19405][STREAMING] Support for cross-accoun...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/16744#discussion_r99718950 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala --- @@ -35,10 +36,65 @@ import org.apache.spark.streaming.receiver.{BlockGenerator, BlockGeneratorListen import org.apache.spark.util.Utils private[kinesis] -case class SerializableAWSCredentials(accessKeyId: String, secretKey: String) - extends AWSCredentials { - override def getAWSAccessKeyId: String = accessKeyId - override def getAWSSecretKey: String = secretKey +case class SerializableKCLAuthProvider( --- End diff -- Was hoping for some feedback here. I think making this an interface with split basic/STS implementations should work well. I'll give it a shot. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16744: [SPARK-19405][STREAMING] Support for cross-accoun...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/16744#discussion_r99718545 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisExampleUtils.scala --- @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming + +private[streaming] object KinesisExampleUtils { + def getRegionNameByEndpoint(endpoint: String): String = endpoint.split("\\.")(1) --- End diff -- Sounds good. Went for a quick fix but this is much nicer. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16797: [SPARK-19455][SQL] Add option for case-insensitive Parqu...
Github user budde commented on the issue: https://github.com/apache/spark/pull/16797 I'll double check, but I don't think ```spark.sql.hive.manageFilesourcePartitions=false``` would solve this issue since we're still deriving the file relation's dataSchema parameter from the schema of MetastoreRelation. The call to ```fileFormat.inferSchema()``` has been removed entirely. If Spark SQL is set on using a table property to store the case-sesnitive schema then I think having a way to backfill this property for existing < 2.1 tables as well as tables not created or managed by Spark will be a necessity. If the cleanest way to deal with this case sensitivity problem is to bring back schema inference then I think a good option would be to introduce a configuration param to indicate whether or not an inferred schema should be written back to the table as a property. We could also introduce another config param that allows a user to bypass schema inference even if a case-sensitive schema can't be read from the table properties. This could be helpful for users who would like to query external Hive tables that aren't managed by Spark and that they know aren't backed by files containing case-sensitive field names. This would basically allow us to support the following use cases: 1) The MetastoreRelation is able to read a case-sensitive schema from the table properties. No inference is necessary. 2) The MetastoreRelation can't read a case-sensitive schema from the table properties. A case-sensitive schema is inferred and, if configured, written back as a table property. 3) The MetastoreRelation can't read a case-sensitive schema from the table properties. The user knows the underlying data files don't contain case-sensitive field names and has explicitly set a config param to skip the inference step. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16744: [SPARK-19405][STREAMING] Support for cross-account Kines...
Github user budde commented on the issue: https://github.com/apache/spark/pull/16744 Amending this PR to upgrade the KCL/AWS SDK dependencies to more-current versions (1.7.3 and 1.11.76, respectively). The ```RegionUtils.getRegionByEndpoint()``` API was removed from the SDK, so I've had to replace it with a simple string split method for the examples and test suites that were utilizing it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16797: [SPARK-19455][SQL] Add option for case-insensitive Parqu...
Github user budde commented on the issue: https://github.com/apache/spark/pull/16797 Bringing back schema inference is certainly a much cleaner option, although I imagine doing this in the old manner would negate the performance improvements brought by #14690 for any non-Spark 2.1 dataset. Ideally, I think we would infer the schema only from the pruned partition list for tables we can't read a case-sensitive schema for. Unless I'm mistaken, this would have to happen during optimization of the logical plan, after the PruneFileSourcePartitions rule has been applied. My thought is that we could write a rule that passes the pruned file list to the file format's inferSchema() method to replace the HadoopFsRelation's dataSchema with the result. I'm not very familiar with Catalyst though, so I'm not sure if changing the relation's schema during optimization will cause problems. There is [an open PR to add support for case-insensitive schemas to Parquet](https://github.com/apache/parquet-mr/pull/210) which would be helpful here since it would provide a way to avoid schema inference when your Parquet files have case-sensitive fields but you don't care about case sensitivity when querying. Unfortunately the PR seems to be more or less abandoned though. Pinging @mallman, the author of #14690, to see if he has any input on this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16797: [SPARK-19455][SQL] Add option for case-insensitiv...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/16797#discussion_r99458106 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala --- @@ -268,13 +292,23 @@ private[parquet] object ParquetReadSupport { * @return A list of clipped [[GroupType]] fields, which can be empty. */ private def clipParquetGroupFields( - parquetRecord: GroupType, structType: StructType): Seq[Type] = { -val parquetFieldMap = parquetRecord.getFields.asScala.map(f => f.getName -> f).toMap + parquetRecord: GroupType, + structType: StructType, + caseInsensitive: Boolean): Seq[Type] = { +val parquetFieldMap = { + val pairs = parquetRecord.getFields.asScala.map(f => f.getName -> f) + implicit val ordering = if (caseInsensitive) { +Ordering.by[String, String](_.toLowerCase) + } else { +Ordering.String + } + TreeMap(pairs: _*) --- End diff -- In the current implementation, we exploit the sorted map keyspace to define an ordering where, if configured, lookups are case-insensitive due to sorting by the downcased key. We only use the values in the map at one point though, so I'll just roll it back to using a standard Map and add an if/else for downcasing the key when creating/accessing the map. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16797: [SPARK-19455][SQL] Add option for case-insensitiv...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/16797#discussion_r99456138 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala --- @@ -268,13 +292,23 @@ private[parquet] object ParquetReadSupport { * @return A list of clipped [[GroupType]] fields, which can be empty. */ private def clipParquetGroupFields( - parquetRecord: GroupType, structType: StructType): Seq[Type] = { -val parquetFieldMap = parquetRecord.getFields.asScala.map(f => f.getName -> f).toMap + parquetRecord: GroupType, + structType: StructType, + caseInsensitive: Boolean): Seq[Type] = { +val parquetFieldMap = { + val pairs = parquetRecord.getFields.asScala.map(f => f.getName -> f) + implicit val ordering = if (caseInsensitive) { +Ordering.by[String, String](_.toLowerCase) + } else { +Ordering.String + } + TreeMap(pairs: _*) --- End diff -- The implicit ordering val determines if the TreeMap behaves in a case-sensitive manner or not. I can rework this into using standard Maps if you feel thats appropriate or clearer. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16797: [SPARK-19455][SQL] Add option for case-insensitiv...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/16797#discussion_r99455967 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -249,10 +249,18 @@ object SQLConf { val PARQUET_VECTORIZED_READER_ENABLED = SQLConfigBuilder("spark.sql.parquet.enableVectorizedReader") - .doc("Enables vectorized parquet decoding.") + .doc("Ingnores case sensitivity differences in field names while resolving Parquet columns " + --- End diff -- My bad. Don't know how that got in. Fixing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16797: [SPARK-19455][SQL] Add option for case-insensitive Parqu...
Github user budde commented on the issue: https://github.com/apache/spark/pull/16797 Looks like SparkR unit tests have been failing for all or most PRs after [this commit.](https://github.com/apache/spark/commit/48aafeda7db879491ed36fff89d59ca7ec3136fa) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16797: [SPARK-19455][SQL] Add option for case-insensitive Parqu...
Github user budde commented on the issue: https://github.com/apache/spark/pull/16797 Relevant part of [Jenkins output](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72326/console) for SparkR tests: ``` Error: processing vignette 'sparkr-vignettes.Rmd' failed with diagnostics: error in evaluating the argument 'object' in selecting a method for function 'summary': Error: object 'kmeansModel' not found ``` Doesn't appear to be related to this change. I'll investigate and see if I can reproduce it locally. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16797: [SPARK-19455][SQL] Add option for case-insensitive Parqu...
Github user budde commented on the issue: https://github.com/apache/spark/pull/16797 Pinging @ericl, @cloud-fan and @davies, committers who have all reviewed or submitted changes related to this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16797: [SPARK-19455][SQL] Add option for case-insensitiv...
GitHub user budde opened a pull request: https://github.com/apache/spark/pull/16797 [SPARK-19455][SQL] Add option for case-insensitive Parquet field resolution ## What changes were proposed in this pull request? **Summary** - Add spark.sql.parquet.caseInsensitiveResolution config option - Add caseInsensitive option to ParquetReadSupport.clipParquetType - Add ParquetIOSuite test - Disable Parquet filter push-down when using case-insensitive field resolution **Details** [*Copied from SPARK-19455*](https://issues.apache.org/jira/browse/SPARK-19455) [SPARK-16980](https://issues.apache.org/jira/browse/SPARK-16980) removed the schema inferrence from the HiveMetastoreCatalog class when converting a MetastoreRelation to a LoigcalRelation (HadoopFsRelation, in this case) in favor of simply using the schema returend by the metastore. This results in an optimization as the underlying file status no longer need to be resolved until after the partition pruning step, reducing the number of files to be touched significantly in some cases. The downside is that the data schema used may no longer match the underlying file schema for case-sensitive formats such as Parquet. This change initially included a [patch to ParquetReadSupport](https://github.com/apache/spark/blob/6ce1b675ee9fc9a6034439c3ca00441f9f172f84/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala#L270-L284) that attempted to remedy this conflict by using a case-insentive fallback mapping when resolving field names during the schema clipping step. [SPARK-1833](https://issues.apache.org/jira/browse/SPARK-18333) later removed this patch after [SPARK-17183](https://issues.apache.org/jira/browse/SPARK-17183) added support for embedding a case-sensitive schema as a Hive Metastore table property. AFAIK the assumption here was that the data schema obtained from the Metastore table property will be case sensitive and should match the Parquet schema exactly. The problem arises when dealing with Parquet-backed tables for which this schema has not been embedded as a table attributes and for which the underlying files contain case-sensitive field names. This will happen for any Hive table that was not created by Spark or created by a version prior to 2.1.0. We've seen Spark SQL return no results for any query containing a case-sensitive field name for such tables. The change we're proposing is to introduce a configuration parameter that will re-enable case-insensitive field name resolution in ParquetReadSupport. This option will also disable filter push-down for Parquet, as the filter predicate constructed by Spark SQL contains the case-insensitive field names which Parquet will return 0 records for when filtering against a case-sensitive column name. I was hoping to find a way to construct the filter on-the-fly in ParquetReadSupport but Parquet doesn't propegate the Configuration object passed to this class to the underlying InternalParquetRecordReader class. ## How was this patch tested? This test re-introduces a unit test to ParquetSchemaSuite.scala to test that case-insensitive schema clipping behaves as expected. It also introduces a ParquetIOSuite unit test that constructs a case-insensitive catalog table and ensures case-sensitive Parquet data can still be queried against. Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/budde/spark SPARK-19455 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16797.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16797 commit 5426271946419a9defb59bb84575501bc8296578 Author: Budde <bu...@amazon.com> Date: 2017-02-02T07:34:15Z [SPARK-19455][SQL] Add option for case-insensitive Parquet field resolution - Add spark.sql.parquet.caseInsensitiveResolution config option - Add caseInsensitive option to ParquetReadSupport.clipParquetType - Add ParquetIOSuite test - Disable Parquet filter push-down when using case-insensitive field resolution --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16744: [SPARK-19405][STREAMING] Support for cross-accoun...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/16744#discussion_r99217534 --- Diff: pom.xml --- @@ -146,6 +146,8 @@ hadoop2 0.7.1 1.6.2 + +1.10.61 --- End diff -- I believe there was previously a direct dependency on the AWS SDK but it is currently getting pulled in as a transitive dependency of the Kinesis Client Library. The KCL dependencies don't include the aws-java-sdk-sts Maven artifact so we must add it as an explicit dependency in the pom.xml for kinesis-asl. The AWS SDK is licensed under [Apache 2.0](https://github.com/aws/aws-sdk-java/blob/master/LICENSE.txt) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16744: [SPARK-19405][STREAMING] Support for cross-account Kines...
Github user budde commented on the issue: https://github.com/apache/spark/pull/16744 Pinging @brkyvz as well, who also appears to have reviewed kinesis-asl changes in the past --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16744: [SPARK-19405][STREAMING] Support for cross-account Kines...
Github user budde commented on the issue: https://github.com/apache/spark/pull/16744 There shouldn't be any change to behavior or compatibility when using the existing implementations of ```KinesisUtils.createStream()```. Only drawback I can think of is this is making the ```createStream()``` API more complex by introducing an additional set of optional config values, which in turn necessitates an additional set of overridden interface implementations. I think the longer-term solution here is to introduce a builder-style API for generating Kinesis streams and eventually put the existing ```KinesisUtils.createStream()``` on the deprecation path, but I've chosen to bite the bullet and just extend ```createStream()``` further in the interest of making this a minimal change. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16744: [SPARK-19405][STREAMING] Support for cross-account Kines...
Github user budde commented on the issue: https://github.com/apache/spark/pull/16744 Pinging @zsxwing and @srowen, additional committers who have previously reviewed kinesis-asl changes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16744: [SPARK-19405][STREAMING] Support for cross-account Kines...
Github user budde commented on the issue: https://github.com/apache/spark/pull/16744 Pinging @tdas on this-- looks like you're the committer who has contributed the most to kinesis-asl. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16744: [SPARK-19405][STREAMING] Support for cross-account Kines...
Github user budde commented on the issue: https://github.com/apache/spark/pull/16744 Also, on another note, the ```SerializableKCLAuthProvider``` class that **SparkQA** is identifying as a new public class is actually package private and replaced another package private class (```SerializableAWSCredentials```). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16744: [SPARK-19405][STREAMING] Support for cross-account Kines...
Github user budde commented on the issue: https://github.com/apache/spark/pull/16744 The JIRA I opended for this issue contains further details and background. Linking to it here for good measure: * https://issues.apache.org/jira/browse/SPARK-19405 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16744: [SPARK-19405][STREAMING] Support for cross-account Kines...
Github user budde commented on the issue: https://github.com/apache/spark/pull/16744 Missed the code in python/streaming that this touches. Will update PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16744: [SPARK-19405][STREAMING] Support for cross-accoun...
GitHub user budde opened a pull request: https://github.com/apache/spark/pull/16744 [SPARK-19405][STREAMING] Support for cross-account Kinesis reads via STS - Add dependency on aws-java-sdk-sts - Replace SerializableAWSCredentials with new SerializableKCLAuthProvider class - Make KinesisReceiver take SerializableKCLAuthProvider as argument and pass credential provider to KCL - Add new implementations of KinesisUtils.createStream() that take STS arguments - Make JavaKinesisStreamSuite test the entire KinesisUtils Java API ## What changes were proposed in this pull request? * Replace SerializableAWSCredentials with new SerializableKCLAuthProvider class that takes 5 optional config params for configuring AWS auth and returns the appropriate credential provider object * Add new public createStream() APIs for specifying these parameters in KinesisUtils ## How was this patch tested? * Manually tested using explicit keypair and instance profile to read data from Kinesis stream in separate account (difficult to write a test orchestrating creation and assumption of IAM roles across separate accounts) * Expanded JavaKinesisStreamSuite to test the entire Java API in KinesisUtils ## License acknowledgement This contribution is my original work and that I license the work to the project under the projectâs open source license. You can merge this pull request into a Git repository by running: $ git pull https://github.com/budde/spark master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16744.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16744 commit 4786cdec136b717b1a933e67b61fe79ce3e3ce1e Author: Budde <bu...@amazon.com> Date: 2017-01-17T18:21:06Z [SPARK-19405][STREAMING] Add support to KinesisUtils for cross-account Kinesis reads via STS - Add dependency on aws-java-sdk-sts - Replace SerializableAWSCredentials with new SerializableKCLAuthProvider class - Make KinesisReceiver take SerializableKCLAuthProvider as argument and pass credential provider to KCL - Add new implementations of KinesisUtils.createStream() that take STS arguments - Make JavaKinesisStreamSuite test the entire KinesisUtils Java API --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13122] Fix race condition in MemoryStor...
Github user budde commented on the pull request: https://github.com/apache/spark/pull/11012#issuecomment-178853877 From Jenkins output: >Fetching upstream changes from https://github.com/apache/spark.git > git --version # timeout=10 > git fetch --tags --progress https://github.com/apache/spark.git +refs/pull/11012/*:refs/remotes/origin/pr/11012/* # timeout=15 ERROR: Timeout after 15 minutes ERROR: Error fetching remote repo 'origin' --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13122] Fix race condition in MemoryStor...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/11012#discussion_r51645315 --- Diff: core/src/main/scala/org/apache/spark/storage/MemoryStore.scala --- @@ -304,10 +309,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo // release the unroll memory yet. Instead, we transfer it to pending unroll memory // so `tryToPut` can further transfer it to normal storage memory later. // TODO: we can probably express this without pending unroll memory (SPARK-10907) - val amountToTransferToPending = currentUnrollMemoryForThisTask - previousMemoryReserved --- End diff -- @nongli â the problem is that the original implementation assumes that previousMemoryReserved is an invariant representing the number of unroll bytes allocated for the process besides the pending bytes allocated during the unroll, but no synchronization exists to enforce this invariant. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13122] Fix race condition in MemoryStor...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/11012#discussion_r51643796 --- Diff: core/src/main/scala/org/apache/spark/storage/MemoryStore.scala --- @@ -304,10 +309,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo // release the unroll memory yet. Instead, we transfer it to pending unroll memory // so `tryToPut` can further transfer it to normal storage memory later. // TODO: we can probably express this without pending unroll memory (SPARK-10907) - val amountToTransferToPending = currentUnrollMemoryForThisTask - previousMemoryReserved --- End diff -- Per my earlier comment, I updated the PR to use to use a var named previousMemoryReserved to manually track the number of unroll bytes allocated during a given invocation of unrollSafely rather than relying on unrollMemoryMap(taskAttemptId) not being modified outside of the given thread between the assignment to previousMemoryReserved and the memory maps being updated in the finally { } block. This should remove the need to make the whole method synchronized. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13122] Fix race condition in MemoryStor...
Github user budde commented on the pull request: https://github.com/apache/spark/pull/11012#issuecomment-178940544 Looks like a bunch of Spark SQL/Hive tests are failing due to this error: >Caused by: sbt.ForkMain$ForkError: org.apache.spark.SparkException: Job aborted due to stage failure: Task 19 in stage 4892.0 failed 1 times, most recent failure: Lost task 19.0 in stage 4892.0 (TID 69335, localhost): java.lang.RuntimeException: Stream '/jars/TestUDTF.jar' was not found. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13122] Fix race condition in MemoryStor...
Github user budde commented on the pull request: https://github.com/apache/spark/pull/11012#issuecomment-178929153 Latest change is looking good on my end. No unroll memory is being leaked. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13122] Fix race condition in MemoryStor...
Github user budde commented on the pull request: https://github.com/apache/spark/pull/11012#issuecomment-178766741 Updated PR with new implementation that uses a counter variable instead of requiring the whole method to be atomic. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13122] Fix race condition in MemoryStor...
Github user budde commented on the pull request: https://github.com/apache/spark/pull/11012#issuecomment-178314141 Pinging @andrewor14 , the original implementor of unrollSafely(), for any potential feedback. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13122] Fix race condition in MemoryStor...
GitHub user budde opened a pull request: https://github.com/apache/spark/pull/11012 [SPARK-13122] Fix race condition in MemoryStore.unrollSafely() https://issues.apache.org/jira/browse/SPARK-13122 A race condition can occur in MemoryStore's unrollSafely() method if two threads that return the same value for currentTaskAttemptId() execute this method concurrently. This change makes the operation of reading the initial amount of unroll memory used, performing the unroll, and updating the associated memory maps atomic in order to avoid this race condition. Initial proposed fix wraps all of unrollSafely() in a memoryManager.synchronized { } block. A cleaner approach might be introduce a mechanism that synchronizes based on task attempt ID. An alternative option might be to track unroll/pending unroll memory based on block ID rather than task attempt ID. You can merge this pull request into a Git repository by running: $ git pull https://github.com/budde/spark master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/11012.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #11012 commit 6e0156c6e3c2fd137005ce3d55cecdf7070795da Author: Adam Budde <bu...@amazon.com> Date: 2016-02-02T01:04:45Z [SPARK-13122] Fix race condition in MemoryStore.unrollSafely() https://issues.apache.org/jira/browse/SPARK-13122 A race condition can occur in MemoryStore's unrollSafely() method if two threads that return the same value for currentTaskAttemptId() execute this method concurrently. This change makes the operation of reading the initial amount of unroll memory used, performing the unroll, and updating the associated memory maps atomic in order to avoid this race condition. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6538][SQL] Add missing nullable Metasto...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/5214#discussion_r27315420 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala --- @@ -775,6 +777,32 @@ private[sql] object ParquetRelation2 extends Logging { }) } + /** + * Returns the original schema from the Parquet file with any missing nullable fields from the + * Hive Metastore schema merged in. + * + * When constructing a DataFrame from a collection of structured data, the resulting object has + * a schema corresponding to the union of the fields present in each element of the collection. + * Spark SQL simply assigns a null value to any field that isn't present for a particular row. + * In some cases, it is possible that a given table partition stored as a Parquet file doesn't + * contain a particular nullable field in its schema despite that field being present in the + * table schema obtained from the Hive Metastore. This method returns a schema representing the + * Parquet file schema along with any additional nullable fields from the Metastore schema + * merged in. + */ + private[parquet] def mergeMissingNullableFields( + metastoreSchema: StructType, + parquetSchema: StructType): StructType = { +val fieldMap = metastoreSchema.map(f = f.name.toLowerCase - f).toMap +val missingFields = metastoreSchema + .map(_.name.toLowerCase) + .diff(parquetSchema.map(_.name.toLowerCase)) + .map(fieldMap(_)) + .filter(_.nullable) +StructType(parquetSchema ++ missingFields) --- End diff -- How should we deal with potential ambiguities that may be introduced due to #5141? For instance, say we are merging the following schemas: Metastores schema | Parquet schema ---|--- Foo | Foo Bar | Bar *Baz* | *Bop* Bat | Bat The following options come to mind: * Attempt to merge the orderings and accept any possibility when there are ambiguities (e.g. both `Foo Bar Baz Bop Bat` and `Foo Bar Bop Baz Bat` are acceptable). * The fields defined in the metastore schema always begin in order, followed by any additional fields defined in the Parquet schema (e.g. `Foo Bar Baz Bat Bop` is the only accepted ordering). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6538][SQL] Add missing nullable Metasto...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/5214#discussion_r27311560 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala --- @@ -775,6 +777,32 @@ private[sql] object ParquetRelation2 extends Logging { }) } + /** + * Returns the original schema from the Parquet file with any missing nullable fields from the + * Hive Metastore schema merged in. + * + * When constructing a DataFrame from a collection of structured data, the resulting object has + * a schema corresponding to the union of the fields present in each element of the collection. + * Spark SQL simply assigns a null value to any field that isn't present for a particular row. + * In some cases, it is possible that a given table partition stored as a Parquet file doesn't + * contain a particular nullable field in its schema despite that field being present in the + * table schema obtained from the Hive Metastore. This method returns a schema representing the + * Parquet file schema along with any additional nullable fields from the Metastore schema + * merged in. + */ + private[parquet] def mergeMissingNullableFields( + metastoreSchema: StructType, + parquetSchema: StructType): StructType = { +val fieldMap = metastoreSchema.map(f = f.name.toLowerCase - f).toMap +val missingFields = metastoreSchema + .map(_.name.toLowerCase) + .diff(parquetSchema.map(_.name.toLowerCase)) + .map(fieldMap(_)) + .filter(_.nullable) +StructType(parquetSchema ++ missingFields) --- End diff -- What is the expected order of fields in a schema? Is is lexicographic? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6538][SQL] Add missing nullable Metasto...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/5214#discussion_r27332712 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala --- @@ -775,6 +777,32 @@ private[sql] object ParquetRelation2 extends Logging { }) } + /** + * Returns the original schema from the Parquet file with any missing nullable fields from the + * Hive Metastore schema merged in. + * + * When constructing a DataFrame from a collection of structured data, the resulting object has + * a schema corresponding to the union of the fields present in each element of the collection. + * Spark SQL simply assigns a null value to any field that isn't present for a particular row. + * In some cases, it is possible that a given table partition stored as a Parquet file doesn't + * contain a particular nullable field in its schema despite that field being present in the + * table schema obtained from the Hive Metastore. This method returns a schema representing the + * Parquet file schema along with any additional nullable fields from the Metastore schema + * merged in. + */ + private[parquet] def mergeMissingNullableFields( + metastoreSchema: StructType, + parquetSchema: StructType): StructType = { +val fieldMap = metastoreSchema.map(f = f.name.toLowerCase - f).toMap +val missingFields = metastoreSchema + .map(_.name.toLowerCase) + .diff(parquetSchema.map(_.name.toLowerCase)) + .map(fieldMap(_)) + .filter(_.nullable) +StructType(parquetSchema ++ missingFields) --- End diff -- I see. Based on the change made in #5141, it looks like the schema returned by **mergeMissingNullableFields()** will still contain any additional fields defined in `parquetSchema` (lines 766-767). How would you feel about simply removing the additional `parquetSchema` fields in the **mergeMissingNullableFields()** method? Execution would look something like this: * remove additional `parquetSchema` fields * call **mergeMissingNullableFields()** on schema w/additional fields removed * proceed with executing **mergeMetastoreParquetSchema()** with additions made #5141 removed (should be unnecessary if we prune additional fields first) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6538][SQL] Add missing nullable Metasto...
Github user budde commented on a diff in the pull request: https://github.com/apache/spark/pull/5214#discussion_r27332969 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala --- @@ -775,6 +777,32 @@ private[sql] object ParquetRelation2 extends Logging { }) } + /** + * Returns the original schema from the Parquet file with any missing nullable fields from the + * Hive Metastore schema merged in. + * + * When constructing a DataFrame from a collection of structured data, the resulting object has + * a schema corresponding to the union of the fields present in each element of the collection. + * Spark SQL simply assigns a null value to any field that isn't present for a particular row. + * In some cases, it is possible that a given table partition stored as a Parquet file doesn't + * contain a particular nullable field in its schema despite that field being present in the + * table schema obtained from the Hive Metastore. This method returns a schema representing the + * Parquet file schema along with any additional nullable fields from the Metastore schema + * merged in. + */ + private[parquet] def mergeMissingNullableFields( + metastoreSchema: StructType, + parquetSchema: StructType): StructType = { +val fieldMap = metastoreSchema.map(f = f.name.toLowerCase - f).toMap +val missingFields = metastoreSchema + .map(_.name.toLowerCase) + .diff(parquetSchema.map(_.name.toLowerCase)) + .map(fieldMap(_)) + .filter(_.nullable) +StructType(parquetSchema ++ missingFields) --- End diff -- Actually, now that I consider it, I'm not convinced that having the **mergeNullableFields()** method return the fields in non-metastore order is a problem here. LInes 766-767 of **mergeMetastoreParquetSchema()** should handle putting them in the proper order. Removing the additional fields is still an option to consider, however. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6538][SQL] Add missing nullable Metasto...
GitHub user budde opened a pull request: https://github.com/apache/spark/pull/5214 [SPARK-6538][SQL] Add missing nullable Metastore fields when merging a Parquet schema Opening to replace #5188. When Spark SQL infers a schema for a DataFrame, it will take the union of all field types present in the structured source data (e.g. an RDD of JSON data). When the source data for a row doesn't define a particular field on the DataFrame's schema, a null value will simply be assumed for this field. This workflow makes it very easy to construct tables and query over a set of structured data with a nonuniform schema. However, this behavior is not consistent in some cases when dealing with Parquet files and an external table managed by an external Hive metastore. In our particular usecase, we use Spark Streaming to parse and transform our input data and then apply a window function to save an arbitrary-sized batch of data as a Parquet file, which itself will be added as a partition to an external Hive table via an *ALTER TABLE... ADD PARTITION...* statement. Since our input data is nonuniform, it is expected that not every partition batch will contain every field present in the table's schema obtained from the Hive metastore. As such, we expect that the schema of some of our Parquet files may not contain the same set fields present in the full metastore schema. In such cases, it seems natural that Spark SQL would simply assume null values for any missing fields in the partition's Parquet file, assuming these fields are specified as nullable by the metastore schema. This is not the case in the current implementation of ParquetRelation2. The **mergeMetastoreParquetSchema()** method used to reconcile differences between a Parquet file's schema and a schema retrieved from the Hive metastore will raise an exception if the Parquet file doesn't match the same set of fields specified by the metastore. This pull requests alters the behavior of **mergeMetastoreParquetSchema()** by having it first add any nullable fields from the metastore schema to the Parquet file schema if they aren't already present there. You can merge this pull request into a Git repository by running: $ git pull https://github.com/budde/spark nullable-fields Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/5214.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5214 commit 9041bfa3de565e3bafeaf87fc33a44925a0a0320 Author: Adam Budde bu...@amazon.com Date: 2015-03-25T19:59:34Z Add missing nullable Metastore fields when merging a Parquet schema --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6538][SQL] Add missing nullable Metasto...
Github user budde closed the pull request at: https://github.com/apache/spark/pull/5188 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6538][SQL] Add missing nullable Metasto...
Github user budde commented on the pull request: https://github.com/apache/spark/pull/5188#issuecomment-86625383 Thanks for the input, @marmbrus and @liancheng. I'll resolve the conflicts and open a new PR against master. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6538][SQL] Add missing nullable Metasto...
Github user budde commented on the pull request: https://github.com/apache/spark/pull/5214#issuecomment-86699105 Taking a look at why these tests failed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org