[GitHub] [hudi] umehrot2 commented on a change in pull request #1702: [HUDI-426] Bootstrap datasource integration

2020-08-07 Thread GitBox


umehrot2 commented on a change in pull request #1702:
URL: https://github.com/apache/hudi/pull/1702#discussion_r46735



##
File path: 
hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
##
@@ -0,0 +1,616 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.functional
+
+import java.time.Instant
+import java.util.Collections
+
+import collection.JavaConverters._
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hudi.bootstrap.SparkParquetBootstrapDataProvider
+import org.apache.hudi.client.TestBootstrap
+import 
org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, 
HoodieDataSourceHelpers}
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.table.timeline.HoodieTimeline
+import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieCompactionConfig, 
HoodieWriteConfig}
+import org.apache.hudi.keygen.SimpleKeyGenerator
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.sql.functions.{col, lit}
+import org.apache.spark.sql.{SaveMode, SparkSession}
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.{BeforeEach, Test}
+import org.junit.jupiter.api.io.TempDir
+
+class TestDataSourceForBootstrap {
+
+  var spark: SparkSession = _
+  val commonOpts = Map(
+HoodieWriteConfig.INSERT_PARALLELISM -> "4",
+HoodieWriteConfig.UPSERT_PARALLELISM -> "4",
+HoodieWriteConfig.DELETE_PARALLELISM -> "4",
+HoodieWriteConfig.BULKINSERT_PARALLELISM -> "4",
+HoodieWriteConfig.FINALIZE_WRITE_PARALLELISM -> "4",
+HoodieBootstrapConfig.BOOTSTRAP_PARALLELISM -> "4",
+DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key",
+DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition",
+DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "timestamp",
+HoodieWriteConfig.TABLE_NAME -> "hoodie_test"
+  )
+  var basePath: String = _
+  var srcPath: String = _
+  var fs: FileSystem = _
+
+  @BeforeEach def initialize(@TempDir tempDir: java.nio.file.Path) {
+spark = SparkSession.builder
+  .appName("Hoodie Datasource test")
+  .master("local[2]")
+  .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+  .getOrCreate
+basePath = tempDir.toAbsolutePath.toString + "/base"
+srcPath = tempDir.toAbsolutePath.toString + "/src"
+fs = FSUtils.getFs(basePath, spark.sparkContext.hadoopConfiguration)
+  }
+

Review comment:
   Thanks @garyli1019 . You were right, I wasn't cleaning up the spark 
contexts after my test runs. Fixed it now.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] umehrot2 commented on a change in pull request #1702: [HUDI-426] Bootstrap datasource integration

2020-08-06 Thread GitBox


umehrot2 commented on a change in pull request #1702:
URL: https://github.com/apache/hudi/pull/1702#discussion_r466748892



##
File path: hudi-spark/src/main/scala/org/apache/hudi/HudiBootstrapRDD.scala
##
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+class HudiBootstrapRDD(@transient spark: SparkSession,

Review comment:
   Makes sense to do it in a streamlined fashion. Made the change.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] umehrot2 commented on a change in pull request #1702: [HUDI-426] Bootstrap datasource integration

2020-08-06 Thread GitBox


umehrot2 commented on a change in pull request #1702:
URL: https://github.com/apache/hudi/pull/1702#discussion_r466748555



##
File path: hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
##
@@ -56,29 +58,56 @@ class DefaultSource extends RelationProvider
 val parameters = Map(QUERY_TYPE_OPT_KEY -> DEFAULT_QUERY_TYPE_OPT_VAL) ++ 
translateViewTypesToQueryTypes(optParams)
 
 val path = parameters.get("path")
-if (path.isEmpty) {
-  throw new HoodieException("'path' must be specified.")
-}
 
 if (parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_SNAPSHOT_OPT_VAL)) {
-  // this is just effectively RO view only, where `path` can contain a mix 
of
-  // non-hoodie/hoodie path files. set the path filter up
-  sqlContext.sparkContext.hadoopConfiguration.setClass(
-"mapreduce.input.pathFilter.class",
-classOf[HoodieROTablePathFilter],
-classOf[org.apache.hadoop.fs.PathFilter])
-
-  log.info("Constructing hoodie (as parquet) data source with options :" + 
parameters)
-  log.warn("Snapshot view not supported yet via data source, for 
MERGE_ON_READ tables. " +
-"Please query the Hive table registered using Spark SQL.")
-  // simply return as a regular parquet relation
-  DataSource.apply(
-sparkSession = sqlContext.sparkSession,
-userSpecifiedSchema = Option(schema),
-className = "parquet",
-options = parameters)
-.resolveRelation()
+  val readPathsStr = 
parameters.get(DataSourceReadOptions.READ_PATHS_OPT_KEY)
+  if (path.isEmpty && readPathsStr.isEmpty) {
+throw new HoodieException(s"'path' or '$READ_PATHS_OPT_KEY' or both 
must be specified.")
+  }
+
+  val readPaths = readPathsStr.map(p => 
p.split(",").toSeq).getOrElse(Seq())
+  val allPaths = path.map(p => Seq(p)).getOrElse(Seq()) ++ readPaths
+
+  val fs = FSUtils.getFs(allPaths.head, 
sqlContext.sparkContext.hadoopConfiguration)
+  val globPaths = HudiSparkUtils.checkAndGlobPathIfNecessary(allPaths, fs)
+
+  val tablePath = DataSourceUtils.getTablePath(fs, globPaths.toArray)
+  log.info("Obtained hudi table path: " + tablePath)
+
+  val metaClient = new HoodieTableMetaClient(fs.getConf, tablePath)
+  val bootstrapIndex = BootstrapIndex.getBootstrapIndex(metaClient)
+
+  val isBootstrappedTable = bootstrapIndex.useIndex()

Review comment:
   Made the change, and have created a followup JIRA 
https://issues.apache.org/jira/browse/HUDI-1157





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] umehrot2 commented on a change in pull request #1702: [HUDI-426] Bootstrap datasource integration

2020-08-06 Thread GitBox


umehrot2 commented on a change in pull request #1702:
URL: https://github.com/apache/hudi/pull/1702#discussion_r466320096



##
File path: hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
##
@@ -54,29 +58,54 @@ class DefaultSource extends RelationProvider
 val parameters = Map(QUERY_TYPE_OPT_KEY -> DEFAULT_QUERY_TYPE_OPT_VAL) ++ 
translateViewTypesToQueryTypes(optParams)
 
 val path = parameters.get("path")
-if (path.isEmpty) {
-  throw new HoodieException("'path' must be specified.")
-}
 
 if (parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_SNAPSHOT_OPT_VAL)) {
-  // this is just effectively RO view only, where `path` can contain a mix 
of
-  // non-hoodie/hoodie path files. set the path filter up
-  sqlContext.sparkContext.hadoopConfiguration.setClass(
-"mapreduce.input.pathFilter.class",
-classOf[HoodieROTablePathFilter],
-classOf[org.apache.hadoop.fs.PathFilter])
-
-  log.info("Constructing hoodie (as parquet) data source with options :" + 
parameters)
-  log.warn("Snapshot view not supported yet via data source, for 
MERGE_ON_READ tables. " +
-"Please query the Hive table registered using Spark SQL.")
-  // simply return as a regular parquet relation
-  DataSource.apply(
-sparkSession = sqlContext.sparkSession,
-userSpecifiedSchema = Option(schema),
-className = "parquet",
-options = parameters)
-.resolveRelation()
+  val readPathsStr = 
parameters.get(DataSourceReadOptions.READ_PATHS_OPT_KEY)

Review comment:
   Well right now I added it only for our internal logic to support 
incremental query on bootstrapped tables.
   
   Would you want customers to use this otherwise as well, to be able to 
provide multiple read paths for querying ? Is that the ask here ?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] umehrot2 commented on a change in pull request #1702: [HUDI-426] Bootstrap datasource integration

2020-08-06 Thread GitBox


umehrot2 commented on a change in pull request #1702:
URL: https://github.com/apache/hudi/pull/1702#discussion_r466317612



##
File path: hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
##
@@ -56,29 +58,56 @@ class DefaultSource extends RelationProvider
 val parameters = Map(QUERY_TYPE_OPT_KEY -> DEFAULT_QUERY_TYPE_OPT_VAL) ++ 
translateViewTypesToQueryTypes(optParams)
 
 val path = parameters.get("path")
-if (path.isEmpty) {
-  throw new HoodieException("'path' must be specified.")
-}
 
 if (parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_SNAPSHOT_OPT_VAL)) {
-  // this is just effectively RO view only, where `path` can contain a mix 
of
-  // non-hoodie/hoodie path files. set the path filter up
-  sqlContext.sparkContext.hadoopConfiguration.setClass(
-"mapreduce.input.pathFilter.class",
-classOf[HoodieROTablePathFilter],
-classOf[org.apache.hadoop.fs.PathFilter])
-
-  log.info("Constructing hoodie (as parquet) data source with options :" + 
parameters)
-  log.warn("Snapshot view not supported yet via data source, for 
MERGE_ON_READ tables. " +
-"Please query the Hive table registered using Spark SQL.")
-  // simply return as a regular parquet relation
-  DataSource.apply(
-sparkSession = sqlContext.sparkSession,
-userSpecifiedSchema = Option(schema),
-className = "parquet",
-options = parameters)
-.resolveRelation()
+  val readPathsStr = 
parameters.get(DataSourceReadOptions.READ_PATHS_OPT_KEY)
+  if (path.isEmpty && readPathsStr.isEmpty) {
+throw new HoodieException(s"'path' or '$READ_PATHS_OPT_KEY' or both 
must be specified.")
+  }
+
+  val readPaths = readPathsStr.map(p => 
p.split(",").toSeq).getOrElse(Seq())
+  val allPaths = path.map(p => Seq(p)).getOrElse(Seq()) ++ readPaths
+
+  val fs = FSUtils.getFs(allPaths.head, 
sqlContext.sparkContext.hadoopConfiguration)
+  val globPaths = HudiSparkUtils.checkAndGlobPathIfNecessary(allPaths, fs)
+
+  val tablePath = DataSourceUtils.getTablePath(fs, globPaths.toArray)
+  log.info("Obtained hudi table path: " + tablePath)
+
+  val metaClient = new HoodieTableMetaClient(fs.getConf, tablePath)
+  val bootstrapIndex = BootstrapIndex.getBootstrapIndex(metaClient)
+
+  val isBootstrappedTable = bootstrapIndex.useIndex()

Review comment:
   Good point. I didn't realize that this is being stored in 
`hoodie.properties` file.
   
   Ideally we would want to have a kind of check here, which can tell us `if 
any of the files is still bootstrapped or not`. If all the files written during 
bootstrap have had some `upsert` to it, it means that a regular hudi file has 
been written corresponding to each one of them. In such a case we would be able 
to simply use `spark's parquet datasource`, for querying which would be faster.
   
   Just wanted to bring to your notice. However, for now I can do what you are 
suggesting.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] umehrot2 commented on a change in pull request #1702: [HUDI-426] Bootstrap datasource integration

2020-08-06 Thread GitBox


umehrot2 commented on a change in pull request #1702:
URL: https://github.com/apache/hudi/pull/1702#discussion_r466311768



##
File path: hudi-spark/src/main/scala/org/apache/hudi/HudiBootstrapRDD.scala
##
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+class HudiBootstrapRDD(@transient spark: SparkSession,

Review comment:
   I will change it to `hoodie`, but I thought in general the community was 
moving towards `hudi` as the naming convention.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] umehrot2 commented on a change in pull request #1702: [HUDI-426] Bootstrap datasource integration

2020-08-06 Thread GitBox


umehrot2 commented on a change in pull request #1702:
URL: https://github.com/apache/hudi/pull/1702#discussion_r466310309



##
File path: hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
##
@@ -92,36 +102,69 @@ class IncrementalRelation(val sqlContext: SQLContext,
   override def schema: StructType = latestSchema
 
   override def buildScan(): RDD[Row] = {
-val fileIdToFullPath = mutable.HashMap[String, String]()
+val regularFileIdToFullPath = mutable.HashMap[String, String]()
+var metaBootstrapFileIdToFullPath = mutable.HashMap[String, String]()
+
 for (commit <- commitsToReturn) {
   val metadata: HoodieCommitMetadata = 
HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(commit)
 .get, classOf[HoodieCommitMetadata])
-  fileIdToFullPath ++= metadata.getFileIdAndFullPaths(basePath).toMap
+
+  if (HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS == commit.getTimestamp) 
{

Review comment:
   @bvaradar can chime in here as well, so we can reach a conclusion





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] umehrot2 commented on a change in pull request #1702: [HUDI-426] Bootstrap datasource integration

2020-08-06 Thread GitBox


umehrot2 commented on a change in pull request #1702:
URL: https://github.com/apache/hudi/pull/1702#discussion_r466310024



##
File path: hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
##
@@ -92,36 +102,69 @@ class IncrementalRelation(val sqlContext: SQLContext,
   override def schema: StructType = latestSchema
 
   override def buildScan(): RDD[Row] = {
-val fileIdToFullPath = mutable.HashMap[String, String]()
+val regularFileIdToFullPath = mutable.HashMap[String, String]()
+var metaBootstrapFileIdToFullPath = mutable.HashMap[String, String]()
+
 for (commit <- commitsToReturn) {
   val metadata: HoodieCommitMetadata = 
HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(commit)
 .get, classOf[HoodieCommitMetadata])
-  fileIdToFullPath ++= metadata.getFileIdAndFullPaths(basePath).toMap
+
+  if (HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS == commit.getTimestamp) 
{

Review comment:
   I had discussed this with Balaji, and we concluded to invest in this so 
that there is no difference in terms of user experience w.r.t whether the table 
is bootstrapped or not. Right now users can query data in the first commit 
written through incremental query, and we wanted to support this experience 
with bootstrapped tables as well.
   
   A use-case I can think of:
   
   User needs to access data until a provided `end commit` (from `first commit` 
till some provided `end commit`). This is possible to do with incremental query 
in regular Hudi tables. We would want to support this experience even with 
bootstrapped tables.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] umehrot2 commented on a change in pull request #1702: [HUDI-426] Bootstrap datasource integration

2020-08-04 Thread GitBox


umehrot2 commented on a change in pull request #1702:
URL: https://github.com/apache/hudi/pull/1702#discussion_r465367893



##
File path: hudi-spark/src/main/scala/org/apache/hudi/HudiSparkUtils.scala
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hudi.common.model.HoodieRecord
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.datasources.{FileStatusCache, 
InMemoryFileIndex}
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import scala.collection.JavaConverters._
+
+
+object HudiSparkUtils {
+
+  def getHudiMetadataSchema: StructType = {
+StructType(HoodieRecord.HOODIE_META_COLUMNS.asScala.map(col => {
+StructField(col, StringType, nullable = true)
+}))
+  }
+
+  def checkAndGlobPathIfNecessary(paths: Seq[String], fs: FileSystem): 
Seq[Path] = {
+paths.flatMap(path => {
+  val qualified = new Path(path).makeQualified(fs.getUri, 
fs.getWorkingDirectory)
+  val globPaths = SparkHadoopUtil.get.globPathIfNecessary(fs, qualified)
+  globPaths
+})
+  }
+
+  def createInMemoryFileIndex(sparkSession: SparkSession, globbedPaths: 
Seq[Path]): InMemoryFileIndex = {

Review comment:
   The common useful utilities have been contributed as part of 
https://github.com/apache/hudi/pull/1841





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] umehrot2 commented on a change in pull request #1702: [HUDI-426] Bootstrap datasource integration

2020-08-04 Thread GitBox


umehrot2 commented on a change in pull request #1702:
URL: https://github.com/apache/hudi/pull/1702#discussion_r465367609



##
File path: hudi-client/pom.xml
##
@@ -101,6 +101,11 @@
   org.apache.spark
   spark-sql_${scala.binary.version}
 
+
+  org.apache.spark
+  spark-avro_${scala.binary.version}

Review comment:
   This class is used in `hudi-client` inside 
`BootstrapCommitActionExecutor`. So we cannot move it to `hudi-spark`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] umehrot2 commented on a change in pull request #1702: [HUDI-426] Bootstrap datasource integration

2020-08-04 Thread GitBox


umehrot2 commented on a change in pull request #1702:
URL: https://github.com/apache/hudi/pull/1702#discussion_r465354934



##
File path: hudi-spark/src/main/scala/org/apache/hudi/HudiBootstrapRelation.scala
##
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.exception.HoodieException
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
+import org.apache.spark.sql.types.StructType
+
+import scala.collection.JavaConverters._
+
+/**
+  * This is Spark relation that can be used for querying metadata/fully 
bootstrapped query hudi tables, as well as
+  * non-bootstrapped tables. It implements PrunedFilteredScan interface in 
order to support column pruning and filter
+  * push-down. For metadata bootstrapped files, if we query columns from both 
metadata and actual data then it will
+  * perform a merge of both to return the result.
+  *
+  * Caveat: Filter push-down does not work when querying both metadata and 
actual data columns over metadata
+  * bootstrapped files, because then the metadata file and data file can 
return different number of rows causing errors
+  * merging.
+  *
+  * @param _sqlContext Spark SQL Context
+  * @param userSchema User specified schema in the datasource query
+  * @param globPaths Globbed paths obtained from the user provided path for 
querying
+  * @param metaClient Hudi table meta client
+  * @param optParams DataSource options passed by the user
+  */
+class HudiBootstrapRelation(@transient val _sqlContext: SQLContext,
+val userSchema: StructType,
+val globPaths: Seq[Path],
+val metaClient: HoodieTableMetaClient,
+val optParams: Map[String, String]) extends 
BaseRelation
+  with PrunedFilteredScan with Logging {
+
+  val skeletonSchema: StructType = HudiSparkUtils.getHudiMetadataSchema
+  var dataSchema: StructType = _
+  var fullSchema: StructType = _
+
+  val fileIndex: HudiBootstrapFileIndex = buildFileIndex()
+
+  override def sqlContext: SQLContext = _sqlContext
+
+  override val needConversion: Boolean = false
+
+  override def schema: StructType = inferFullSchema()
+
+  override def buildScan(requiredColumns: Array[String], filters: 
Array[Filter]): RDD[Row] = {
+logInfo("Starting scan..")
+
+// Compute splits
+val bootstrapSplits = fileIndex.files.map(hoodieBaseFile => {
+  var skeletonFile: Option[PartitionedFile] = Option.empty
+  var dataFile: PartitionedFile = null
+
+  if (hoodieBaseFile.getExternalBaseFile.isPresent) {
+skeletonFile = Option(PartitionedFile(InternalRow.empty, 
hoodieBaseFile.getPath, 0, hoodieBaseFile.getFileLen))
+dataFile = PartitionedFile(InternalRow.empty, 
hoodieBaseFile.getExternalBaseFile.get().getPath, 0,
+  hoodieBaseFile.getExternalBaseFile.get().getFileLen)
+  } else {
+dataFile = PartitionedFile(InternalRow.empty, hoodieBaseFile.getPath, 
0, hoodieBaseFile.getFileLen)
+  }
+  HudiBootstrapSplit(dataFile, skeletonFile)
+})
+val tableState = HudiBootstrapTableState(bootstrapSplits)
+
+// Get required schemas for column pruning
+var requiredDataSchema = StructType(Seq())
+var requiredSkeletonSchema = StructType(Seq())
+requiredColumns.foreach(col => {
+  var field = dataSchema.find(_.name == col)
+  if (field.isDefined) {
+requiredDataSchema = requiredDataSchema.add(field.get)
+  } else {
+field = skeletonSchema.find(_.name == col)
+requiredSkeletonSchema = 

[GitHub] [hudi] umehrot2 commented on a change in pull request #1702: [HUDI-426] Bootstrap datasource integration

2020-08-04 Thread GitBox


umehrot2 commented on a change in pull request #1702:
URL: https://github.com/apache/hudi/pull/1702#discussion_r465353608



##
File path: hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
##
@@ -54,29 +58,54 @@ class DefaultSource extends RelationProvider
 val parameters = Map(QUERY_TYPE_OPT_KEY -> DEFAULT_QUERY_TYPE_OPT_VAL) ++ 
translateViewTypesToQueryTypes(optParams)
 
 val path = parameters.get("path")
-if (path.isEmpty) {
-  throw new HoodieException("'path' must be specified.")
-}
 
 if (parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_SNAPSHOT_OPT_VAL)) {
-  // this is just effectively RO view only, where `path` can contain a mix 
of
-  // non-hoodie/hoodie path files. set the path filter up
-  sqlContext.sparkContext.hadoopConfiguration.setClass(
-"mapreduce.input.pathFilter.class",
-classOf[HoodieROTablePathFilter],
-classOf[org.apache.hadoop.fs.PathFilter])
-
-  log.info("Constructing hoodie (as parquet) data source with options :" + 
parameters)
-  log.warn("Snapshot view not supported yet via data source, for 
MERGE_ON_READ tables. " +
-"Please query the Hive table registered using Spark SQL.")
-  // simply return as a regular parquet relation
-  DataSource.apply(
-sparkSession = sqlContext.sparkSession,
-userSpecifiedSchema = Option(schema),
-className = "parquet",
-options = parameters)
-.resolveRelation()
+  val readPathsStr = 
parameters.get(DataSourceReadOptions.READ_PATHS_OPT_KEY)

Review comment:
   These additional paths are being used in the **Incremental query** code 
to make it work for bootstrapped tables. I need to pass a list of bootstrapped 
files to read, and that is why had to add support for reading from multiple 
paths. `spark.read.parquet` already has that kind of support and is being used 
in **incremental relation** already to read a list of files.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] umehrot2 commented on a change in pull request #1702: [HUDI-426] Bootstrap datasource integration

2020-08-04 Thread GitBox


umehrot2 commented on a change in pull request #1702:
URL: https://github.com/apache/hudi/pull/1702#discussion_r465352046



##
File path: hudi-client/pom.xml
##
@@ -101,6 +101,11 @@
   org.apache.spark
   spark-sql_${scala.binary.version}
 
+
+  org.apache.spark
+  spark-avro_${scala.binary.version}

Review comment:
   I have explained the reason here 
https://github.com/apache/hudi/pull/1876/#discussion_r463889083 Let me know 
your thoughts.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] umehrot2 commented on a change in pull request #1702: [HUDI-426] Bootstrap datasource integration

2020-08-04 Thread GitBox


umehrot2 commented on a change in pull request #1702:
URL: https://github.com/apache/hudi/pull/1702#discussion_r465352046



##
File path: hudi-client/pom.xml
##
@@ -101,6 +101,11 @@
   org.apache.spark
   spark-sql_${scala.binary.version}
 
+
+  org.apache.spark
+  spark-avro_${scala.binary.version}

Review comment:
   I have explained the reason here => 
https://github.com/apache/hudi/pull/1876/#discussion_r463889083 Let me know 
your thoughts.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org