[GitHub] [hudi] garyli1019 commented on a change in pull request #1848: [HUDI-69] Support Spark Datasource for MOR table - RDD approach

2020-08-06 Thread GitBox


garyli1019 commented on a change in pull request #1848:
URL: https://github.com/apache/hudi/pull/1848#discussion_r466829856



##
File path: hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
##
@@ -132,11 +132,15 @@ class DefaultSource extends RelationProvider
 
 log.info("Constructing hoodie (as parquet) data source with options :" + 
optParams)
 // simply return as a regular parquet relation
-DataSource.apply(
+val relation =  DataSource.apply(
   sparkSession = sqlContext.sparkSession,
   userSpecifiedSchema = Option(schema),
   className = "parquet",
   options = optParams)
   .resolveRelation()
+
+
sqlContext.sparkContext.hadoopConfiguration.unset("mapreduce.input.pathFilter.class")

Review comment:
   yes make sense. We unset this before the incremental query. When we only 
have two data source query type, it's fine to unset before running incremental 
query, but right now we have to unset this here. Still wondering why the local 
build is able to pass though...





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] garyli1019 commented on a change in pull request #1848: [HUDI-69] Support Spark Datasource for MOR table - RDD approach

2020-08-06 Thread GitBox


garyli1019 commented on a change in pull request #1848:
URL: https://github.com/apache/hudi/pull/1848#discussion_r466167704



##
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
##
@@ -27,9 +27,9 @@ import org.apache.spark.sql.types.{StringType, StructField, 
StructType}
 import scala.collection.JavaConverters._
 
 
-object HudiSparkUtils {
+object HoodieSparkUtils {

Review comment:
   nvm, this was intended based on previous comments to keep the 
consistency with others. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] garyli1019 commented on a change in pull request #1848: [HUDI-69] Support Spark Datasource for MOR table - RDD approach

2020-08-06 Thread GitBox


garyli1019 commented on a change in pull request #1848:
URL: https://github.com/apache/hudi/pull/1848#discussion_r466166224



##
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
##
@@ -27,9 +27,9 @@ import org.apache.spark.sql.types.{StringType, StructField, 
StructType}
 import scala.collection.JavaConverters._
 
 
-object HudiSparkUtils {
+object HoodieSparkUtils {

Review comment:
   @vinothchandar Looks like some unrelated change was added during the 
rebase. Maybe this is related to the issue.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] garyli1019 commented on a change in pull request #1848: [HUDI-69] Support Spark Datasource for MOR table - RDD approach

2020-08-04 Thread GitBox


garyli1019 commented on a change in pull request #1848:
URL: https://github.com/apache/hudi/pull/1848#discussion_r464842847



##
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
##
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.config.HoodieRealtimeConfig
+import 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS
+
+import org.apache.avro.Schema
+import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder}
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.{Partition, SerializableWritable, SparkContext, 
TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.{AvroDeserializer, AvroSerializer}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, 
UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.Try
+
+case class HoodieMergeOnReadPartition(index: Int, split: 
HoodieMergeOnReadFileSplit) extends Partition
+
+class HoodieMergeOnReadRDD(@transient sc: SparkContext,
+   @transient config: Configuration,
+   fullSchemaFileReader: PartitionedFile => 
Iterator[Any],
+   requiredSchemaFileReader: PartitionedFile => 
Iterator[Any],
+   tableState: HoodieMergeOnReadTableState)
+  extends RDD[InternalRow](sc, Nil) {
+
+  private val confBroadcast = sc.broadcast(new SerializableWritable(config))
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
+val mergeParquetPartition = split.asInstanceOf[HoodieMergeOnReadPartition]
+mergeParquetPartition.split match {
+  case dataFileOnlySplit if dataFileOnlySplit.logPaths.isEmpty =>
+read(mergeParquetPartition.split.dataFile, requiredSchemaFileReader)
+  case skipMergeSplit if skipMergeSplit.mergeType
+.equals(DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL) =>
+skipMergeFileIterator(
+  skipMergeSplit,
+  read(mergeParquetPartition.split.dataFile, requiredSchemaFileReader),
+  getConfig
+)
+  case payloadCombineSplit if payloadCombineSplit.mergeType
+.equals(DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL) =>
+payloadCombineFileIterator(
+  payloadCombineSplit,
+  read(mergeParquetPartition.split.dataFile, fullSchemaFileReader),

Review comment:
   Yes there is still some room for improvement. Spark schema is tricky, 
when it passed the required columns into the `PrunedFilterScan`, the order of 
the columns are different from the actual schema, and it will get reorder in 
somewhere else before return to the user. The projected InternalRow and the 
projected Avro record will have a different order of schema. I will look into 
this but not sure if I am able to get it into this release.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] garyli1019 commented on a change in pull request #1848: [HUDI-69] Support Spark Datasource for MOR table - RDD approach

2020-07-23 Thread GitBox


garyli1019 commented on a change in pull request #1848:
URL: https://github.com/apache/hudi/pull/1848#discussion_r459864183



##
File path: hudi-spark/src/main/scala/org/apache/hudi/SnapshotRelation.scala
##
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils
+import 
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.JobConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, TableScan}
+import org.apache.spark.sql.types.StructType
+
+import scala.collection.JavaConverters._
+
+case class HudiMergeOnReadFileSplit(dataFile: PartitionedFile,
+logPaths: Option[List[String]],
+latestCommit: String,
+tablePath: String,
+maxCompactionMemoryInBytes: Long,
+skipMerge: Boolean)
+
+class SnapshotRelation (val sqlContext: SQLContext,
+val optParams: Map[String, String],
+val userSchema: StructType,
+val globPaths: Seq[Path],
+val metaClient: HoodieTableMetaClient)
+  extends BaseRelation with TableScan with Logging{
+
+  private val conf = sqlContext.sparkContext.hadoopConfiguration
+
+  // use schema from latest metadata, if not present, read schema from the 
data file
+  private val latestSchema = {
+val schemaUtil = new TableSchemaResolver(metaClient)
+val tableSchema = 
HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchemaWithoutMetadataFields)
+AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
+  }
+
+  private val skipMerge = optParams.getOrElse(
+DataSourceReadOptions.REALTIME_SKIP_MERGE_KEY,
+DataSourceReadOptions.DEFAULT_REALTIME_SKIP_MERGE_VAL).toBoolean
+  private val maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(new 
JobConf(conf))
+  private val fileIndex = buildFileIndex()
+
+  override def schema: StructType = latestSchema
+
+  override def needConversion: Boolean = false
+
+  override def buildScan(): RDD[Row] = {
+val parquetReaderFunction = new 
ParquetFileFormat().buildReaderWithPartitionValues(
+  sparkSession = sqlContext.sparkSession,
+  dataSchema = latestSchema,
+  partitionSchema = StructType(Nil),
+  requiredSchema = latestSchema,

Review comment:
   After thinking for a while, I think we can handle it this way:
    BaseFileOnly
   use the user-specified schema base file reader
   
    Unmerge
   use the user-specified schema base file reader
   Convert log record to `InternalRow` then extract the correct schema before 
exiting the `unMergeFileIterator`
   Or the other way around.
   
    Merge
   Use the full schema base file reader.
   Merge two records in Avro.
   Convert to InternalRow then extract the correct schema or the other way 
around.
   
   Since the `InternalRow` need the position to extract the value and the 
schema could be nested. This could get complicated once nested columns got 
involved. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] garyli1019 commented on a change in pull request #1848: [HUDI-69] Support Spark Datasource for MOR table - RDD approach

2020-07-23 Thread GitBox


garyli1019 commented on a change in pull request #1848:
URL: https://github.com/apache/hudi/pull/1848#discussion_r459742568



##
File path: hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
##
@@ -58,26 +61,20 @@ class DefaultSource extends RelationProvider
   throw new HoodieException("'path' must be specified.")
 }
 
+val fs = FSUtils.getFs(path.get, 
sqlContext.sparkContext.hadoopConfiguration)
+val globPaths = HudiSparkUtils.checkAndGlobPathIfNecessary(Seq(path.get), 
fs)
+val tablePath = DataSourceUtils.getTablePath(fs, globPaths.toArray)
+val metaClient = new HoodieTableMetaClient(fs.getConf, tablePath)

Review comment:
   I think I can move this into the SNAPSHOT_QUERY session.

##
File path: hudi-spark/src/main/scala/org/apache/hudi/SnapshotRelation.scala
##
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils
+import 
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.JobConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, TableScan}
+import org.apache.spark.sql.types.StructType
+
+import scala.collection.JavaConverters._
+
+case class HudiMergeOnReadFileSplit(dataFile: PartitionedFile,
+logPaths: Option[List[String]],
+latestCommit: String,
+tablePath: String,
+maxCompactionMemoryInBytes: Long,
+skipMerge: Boolean)
+
+class SnapshotRelation (val sqlContext: SQLContext,
+val optParams: Map[String, String],
+val userSchema: StructType,
+val globPaths: Seq[Path],
+val metaClient: HoodieTableMetaClient)
+  extends BaseRelation with TableScan with Logging{
+
+  private val conf = sqlContext.sparkContext.hadoopConfiguration
+
+  // use schema from latest metadata, if not present, read schema from the 
data file
+  private val latestSchema = {
+val schemaUtil = new TableSchemaResolver(metaClient)
+val tableSchema = 
HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchemaWithoutMetadataFields)
+AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
+  }
+
+  private val skipMerge = optParams.getOrElse(
+DataSourceReadOptions.REALTIME_SKIP_MERGE_KEY,
+DataSourceReadOptions.DEFAULT_REALTIME_SKIP_MERGE_VAL).toBoolean
+  private val maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(new 
JobConf(conf))
+  private val fileIndex = buildFileIndex()
+
+  override def schema: StructType = latestSchema
+
+  override def needConversion: Boolean = false
+
+  override def buildScan(): RDD[Row] = {
+val parquetReaderFunction = new 
ParquetFileFormat().buildReaderWithPartitionValues(
+  sparkSession = sqlContext.sparkSession,
+  dataSchema = latestSchema,
+  partitionSchema = StructType(Nil),
+  requiredSchema = latestSchema,

Review comment:
   When I pushdown nothing and pass the full schema as user requested 
schema, with simply changing from `TableScan()` to `PrunedFilteredScan`, the 
behavior of the parquet reader was changed and not reading the correct schema. 
I need to dig deeper here.
   Let's focus on making the basic functionality work in this PR. I will figure 
this out with a follow-up PR.

##
File 

[GitHub] [hudi] garyli1019 commented on a change in pull request #1848: [HUDI-69] Support Spark Datasource for MOR table - RDD approach

2020-07-23 Thread GitBox


garyli1019 commented on a change in pull request #1848:
URL: https://github.com/apache/hudi/pull/1848#discussion_r459230287



##
File path: hudi-spark/src/main/scala/org/apache/hudi/HudiMergeOnReadRDD.scala
##
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.table.log.{HoodieMergedLogRecordScanner, 
LogReaderUtils}
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.config.{HadoopSerializableConfiguration, 
HoodieRealtimeConfig}
+import 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS
+
+import org.apache.avro.Schema
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+case class HudiMergeOnReadPartition(index: Int, split: 
HudiMergeOnReadFileSplit) extends Partition
+
+class HudiMergeOnReadRDD(sc: SparkContext,
+ broadcastedConf: 
Broadcast[HadoopSerializableConfiguration],
+ dataReadFunction: PartitionedFile => Iterator[Any],
+ dataSchema: StructType,
+ hudiRealtimeFileSplits: 
List[HudiMergeOnReadFileSplit])
+  extends RDD[InternalRow](sc, Nil) {
+
+  // Broadcast the hadoop Configuration to executors.
+  def this(sc: SparkContext,
+   config: Configuration,
+   dataReadFunction: PartitionedFile => Iterator[Any],
+   dataSchema: StructType,
+   hudiRealtimeFileSplits: List[HudiMergeOnReadFileSplit]) = {
+this(
+  sc,
+  sc.broadcast(new HadoopSerializableConfiguration(config))
+  .asInstanceOf[Broadcast[HadoopSerializableConfiguration]],
+  dataReadFunction,
+  dataSchema,
+  hudiRealtimeFileSplits)
+  }
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
+val mergeParquetPartition = split.asInstanceOf[HudiMergeOnReadPartition]
+mergeParquetPartition.split match {
+  case dataFileOnlySplit if dataFileOnlySplit.logPaths.isEmpty =>
+read(mergeParquetPartition.split.dataFile, dataReadFunction)
+  case unMergeSplit if unMergeSplit.skipMerge =>
+unMergeFileIterator(mergeParquetPartition.split, dataReadFunction)
+  case mergeSplit if !mergeSplit.skipMerge =>
+mergeFileIterator(mergeParquetPartition.split, dataReadFunction)
+  case _ => throw new HoodieException("Unable to select an Iterator to 
read the Hudi MOR File Split")
+}
+  }
+
+  override protected def getPartitions: Array[Partition] = {
+hudiRealtimeFileSplits.zipWithIndex.map(file => 
HudiMergeOnReadPartition(file._2, file._1)).toArray
+  }
+
+  private def getConfig(): Configuration = {
+broadcastedConf.value.config
+  }
+
+  private def read(partitionedFile: PartitionedFile,
+   readFileFunction: PartitionedFile => Iterator[Any]): 
Iterator[InternalRow] = {
+val fileIterator = readFileFunction(partitionedFile)
+val rows = fileIterator.flatMap(_ match {
+  case r: InternalRow => Seq(r)
+  case b: ColumnarBatch => b.rowIterator().asScala
+})
+rows
+  }
+
+  private def unMergeFileIterator(split: HudiMergeOnReadFileSplit,
+  readFileFunction: PartitionedFile => 
Iterator[Any]): Iterator[InternalRow] =
+new Iterator[InternalRow] {
+  private val dataFileIterator = read(split.dataFile, readFileFunction)
+  private val logSchema = getLogAvroSchema(split)
+  private val sparkTypes = 
SchemaConverters.toSqlType(logSchema).dataType.asInstanceOf[StructType]
+  private val converter = new AvroDeserializer(logSchema, sparkTypes)
+  private val hudiLogRecords = scanLog(split, 

[GitHub] [hudi] garyli1019 commented on a change in pull request #1848: [HUDI-69] Support Spark Datasource for MOR table - RDD approach

2020-07-22 Thread GitBox


garyli1019 commented on a change in pull request #1848:
URL: https://github.com/apache/hudi/pull/1848#discussion_r458544915



##
File path: hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
##
@@ -110,6 +112,10 @@ object DataSourceReadOptions {
*/
   val INCR_PATH_GLOB_OPT_KEY = "hoodie.datasource.read.incr.path.glob"
   val DEFAULT_INCR_PATH_GLOB_OPT_VAL = ""
+
+
+  val REALTIME_SKIP_MERGE_KEY = REALTIME_SKIP_MERGE_PROP

Review comment:
   added `MERGE_ON_READ_PAYLOAD_KEY` and `MERGE_ON_READ_ORDERING_KEY`. Then 
we use the payload to do all the merging.

##
File path: hudi-spark/src/main/scala/org/apache/hudi/SnapshotRelation.scala
##
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils
+import 
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.JobConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, TableScan}
+import org.apache.spark.sql.types.StructType
+
+import scala.collection.JavaConverters._
+
+case class HudiMergeOnReadFileSplit(dataFile: PartitionedFile,
+logPaths: Option[List[String]],
+latestCommit: String,
+tablePath: String,
+maxCompactionMemoryInBytes: Long,
+skipMerge: Boolean)
+
+class SnapshotRelation (val sqlContext: SQLContext,
+val optParams: Map[String, String],
+val userSchema: StructType,
+val globPaths: Seq[Path],
+val metaClient: HoodieTableMetaClient)
+  extends BaseRelation with TableScan with Logging{

Review comment:
   https://issues.apache.org/jira/browse/HUDI-1050

##
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
##
@@ -40,6 +40,8 @@
 import java.io.IOException;
 import java.util.Map;
 
+import static 
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes;

Review comment:
   done

##
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/config/HadoopSerializableConfiguration.java
##
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop.config;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+public class HadoopSerializableConfiguration implements Serializable {

Review comment:
   

[GitHub] [hudi] garyli1019 commented on a change in pull request #1848: [HUDI-69] Support Spark Datasource for MOR table - RDD approach

2020-07-21 Thread GitBox


garyli1019 commented on a change in pull request #1848:
URL: https://github.com/apache/hudi/pull/1848#discussion_r458247881



##
File path: hudi-spark/src/main/scala/org/apache/hudi/HudiMergeOnReadRDD.scala
##
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.table.log.{HoodieMergedLogRecordScanner, 
LogReaderUtils}
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.config.{HadoopSerializableConfiguration, 
HoodieRealtimeConfig}
+import 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS
+
+import org.apache.avro.Schema
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+case class HudiMergeOnReadPartition(index: Int, split: 
HudiMergeOnReadFileSplit) extends Partition
+
+class HudiMergeOnReadRDD(sc: SparkContext,
+ broadcastedConf: 
Broadcast[HadoopSerializableConfiguration],
+ dataReadFunction: PartitionedFile => Iterator[Any],
+ dataSchema: StructType,
+ hudiRealtimeFileSplits: 
List[HudiMergeOnReadFileSplit])
+  extends RDD[InternalRow](sc, Nil) {
+
+  // Broadcast the hadoop Configuration to executors.
+  def this(sc: SparkContext,
+   config: Configuration,
+   dataReadFunction: PartitionedFile => Iterator[Any],
+   dataSchema: StructType,
+   hudiRealtimeFileSplits: List[HudiMergeOnReadFileSplit]) = {
+this(
+  sc,
+  sc.broadcast(new HadoopSerializableConfiguration(config))
+  .asInstanceOf[Broadcast[HadoopSerializableConfiguration]],
+  dataReadFunction,
+  dataSchema,
+  hudiRealtimeFileSplits)
+  }
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
+val mergeParquetPartition = split.asInstanceOf[HudiMergeOnReadPartition]
+mergeParquetPartition.split match {
+  case dataFileOnlySplit if dataFileOnlySplit.logPaths.isEmpty =>
+read(mergeParquetPartition.split.dataFile, dataReadFunction)
+  case unMergeSplit if unMergeSplit.skipMerge =>
+unMergeFileIterator(mergeParquetPartition.split, dataReadFunction)
+  case mergeSplit if !mergeSplit.skipMerge =>
+mergeFileIterator(mergeParquetPartition.split, dataReadFunction)
+  case _ => throw new HoodieException("Unable to select an Iterator to 
read the Hudi MOR File Split")
+}
+  }
+
+  override protected def getPartitions: Array[Partition] = {
+hudiRealtimeFileSplits.zipWithIndex.map(file => 
HudiMergeOnReadPartition(file._2, file._1)).toArray
+  }
+
+  private def getConfig(): Configuration = {
+broadcastedConf.value.config
+  }
+
+  private def read(partitionedFile: PartitionedFile,
+   readFileFunction: PartitionedFile => Iterator[Any]): 
Iterator[InternalRow] = {
+val fileIterator = readFileFunction(partitionedFile)
+val rows = fileIterator.flatMap(_ match {
+  case r: InternalRow => Seq(r)
+  case b: ColumnarBatch => b.rowIterator().asScala
+})
+rows
+  }
+
+  private def unMergeFileIterator(split: HudiMergeOnReadFileSplit,
+  readFileFunction: PartitionedFile => 
Iterator[Any]): Iterator[InternalRow] =
+new Iterator[InternalRow] {
+  private val dataFileIterator = read(split.dataFile, readFileFunction)
+  private val logSchema = getLogAvroSchema(split)
+  private val sparkTypes = 
SchemaConverters.toSqlType(logSchema).dataType.asInstanceOf[StructType]
+  private val converter = new AvroDeserializer(logSchema, sparkTypes)
+  private val hudiLogRecords = scanLog(split, 

[GitHub] [hudi] garyli1019 commented on a change in pull request #1848: [HUDI-69] Support Spark Datasource for MOR table - RDD approach

2020-07-21 Thread GitBox


garyli1019 commented on a change in pull request #1848:
URL: https://github.com/apache/hudi/pull/1848#discussion_r458245661



##
File path: hudi-spark/src/main/scala/org/apache/hudi/HudiMergeOnReadRDD.scala
##
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.table.log.{HoodieMergedLogRecordScanner, 
LogReaderUtils}
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.config.{HadoopSerializableConfiguration, 
HoodieRealtimeConfig}
+import 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS
+
+import org.apache.avro.Schema
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+case class HudiMergeOnReadPartition(index: Int, split: 
HudiMergeOnReadFileSplit) extends Partition
+
+class HudiMergeOnReadRDD(sc: SparkContext,
+ broadcastedConf: 
Broadcast[HadoopSerializableConfiguration],
+ dataReadFunction: PartitionedFile => Iterator[Any],
+ dataSchema: StructType,
+ hudiRealtimeFileSplits: 
List[HudiMergeOnReadFileSplit])
+  extends RDD[InternalRow](sc, Nil) {
+
+  // Broadcast the hadoop Configuration to executors.
+  def this(sc: SparkContext,
+   config: Configuration,
+   dataReadFunction: PartitionedFile => Iterator[Any],
+   dataSchema: StructType,
+   hudiRealtimeFileSplits: List[HudiMergeOnReadFileSplit]) = {
+this(
+  sc,
+  sc.broadcast(new HadoopSerializableConfiguration(config))
+  .asInstanceOf[Broadcast[HadoopSerializableConfiguration]],
+  dataReadFunction,
+  dataSchema,
+  hudiRealtimeFileSplits)
+  }
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
+val mergeParquetPartition = split.asInstanceOf[HudiMergeOnReadPartition]
+mergeParquetPartition.split match {
+  case dataFileOnlySplit if dataFileOnlySplit.logPaths.isEmpty =>
+read(mergeParquetPartition.split.dataFile, dataReadFunction)
+  case unMergeSplit if unMergeSplit.skipMerge =>
+unMergeFileIterator(mergeParquetPartition.split, dataReadFunction)
+  case mergeSplit if !mergeSplit.skipMerge =>
+mergeFileIterator(mergeParquetPartition.split, dataReadFunction)
+  case _ => throw new HoodieException("Unable to select an Iterator to 
read the Hudi MOR File Split")
+}
+  }
+
+  override protected def getPartitions: Array[Partition] = {
+hudiRealtimeFileSplits.zipWithIndex.map(file => 
HudiMergeOnReadPartition(file._2, file._1)).toArray
+  }
+
+  private def getConfig(): Configuration = {
+broadcastedConf.value.config
+  }
+
+  private def read(partitionedFile: PartitionedFile,
+   readFileFunction: PartitionedFile => Iterator[Any]): 
Iterator[InternalRow] = {
+val fileIterator = readFileFunction(partitionedFile)
+val rows = fileIterator.flatMap(_ match {
+  case r: InternalRow => Seq(r)
+  case b: ColumnarBatch => b.rowIterator().asScala
+})
+rows
+  }
+
+  private def unMergeFileIterator(split: HudiMergeOnReadFileSplit,
+  readFileFunction: PartitionedFile => 
Iterator[Any]): Iterator[InternalRow] =
+new Iterator[InternalRow] {
+  private val dataFileIterator = read(split.dataFile, readFileFunction)
+  private val logSchema = getLogAvroSchema(split)
+  private val sparkTypes = 
SchemaConverters.toSqlType(logSchema).dataType.asInstanceOf[StructType]
+  private val converter = new AvroDeserializer(logSchema, sparkTypes)
+  private val hudiLogRecords = scanLog(split, 

[GitHub] [hudi] garyli1019 commented on a change in pull request #1848: [HUDI-69] Support Spark Datasource for MOR table - RDD approach

2020-07-21 Thread GitBox


garyli1019 commented on a change in pull request #1848:
URL: https://github.com/apache/hudi/pull/1848#discussion_r458245661



##
File path: hudi-spark/src/main/scala/org/apache/hudi/HudiMergeOnReadRDD.scala
##
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.table.log.{HoodieMergedLogRecordScanner, 
LogReaderUtils}
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.config.{HadoopSerializableConfiguration, 
HoodieRealtimeConfig}
+import 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS
+
+import org.apache.avro.Schema
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+case class HudiMergeOnReadPartition(index: Int, split: 
HudiMergeOnReadFileSplit) extends Partition
+
+class HudiMergeOnReadRDD(sc: SparkContext,
+ broadcastedConf: 
Broadcast[HadoopSerializableConfiguration],
+ dataReadFunction: PartitionedFile => Iterator[Any],
+ dataSchema: StructType,
+ hudiRealtimeFileSplits: 
List[HudiMergeOnReadFileSplit])
+  extends RDD[InternalRow](sc, Nil) {
+
+  // Broadcast the hadoop Configuration to executors.
+  def this(sc: SparkContext,
+   config: Configuration,
+   dataReadFunction: PartitionedFile => Iterator[Any],
+   dataSchema: StructType,
+   hudiRealtimeFileSplits: List[HudiMergeOnReadFileSplit]) = {
+this(
+  sc,
+  sc.broadcast(new HadoopSerializableConfiguration(config))
+  .asInstanceOf[Broadcast[HadoopSerializableConfiguration]],
+  dataReadFunction,
+  dataSchema,
+  hudiRealtimeFileSplits)
+  }
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
+val mergeParquetPartition = split.asInstanceOf[HudiMergeOnReadPartition]
+mergeParquetPartition.split match {
+  case dataFileOnlySplit if dataFileOnlySplit.logPaths.isEmpty =>
+read(mergeParquetPartition.split.dataFile, dataReadFunction)
+  case unMergeSplit if unMergeSplit.skipMerge =>
+unMergeFileIterator(mergeParquetPartition.split, dataReadFunction)
+  case mergeSplit if !mergeSplit.skipMerge =>
+mergeFileIterator(mergeParquetPartition.split, dataReadFunction)
+  case _ => throw new HoodieException("Unable to select an Iterator to 
read the Hudi MOR File Split")
+}
+  }
+
+  override protected def getPartitions: Array[Partition] = {
+hudiRealtimeFileSplits.zipWithIndex.map(file => 
HudiMergeOnReadPartition(file._2, file._1)).toArray
+  }
+
+  private def getConfig(): Configuration = {
+broadcastedConf.value.config
+  }
+
+  private def read(partitionedFile: PartitionedFile,
+   readFileFunction: PartitionedFile => Iterator[Any]): 
Iterator[InternalRow] = {
+val fileIterator = readFileFunction(partitionedFile)
+val rows = fileIterator.flatMap(_ match {
+  case r: InternalRow => Seq(r)
+  case b: ColumnarBatch => b.rowIterator().asScala
+})
+rows
+  }
+
+  private def unMergeFileIterator(split: HudiMergeOnReadFileSplit,
+  readFileFunction: PartitionedFile => 
Iterator[Any]): Iterator[InternalRow] =
+new Iterator[InternalRow] {
+  private val dataFileIterator = read(split.dataFile, readFileFunction)
+  private val logSchema = getLogAvroSchema(split)
+  private val sparkTypes = 
SchemaConverters.toSqlType(logSchema).dataType.asInstanceOf[StructType]
+  private val converter = new AvroDeserializer(logSchema, sparkTypes)
+  private val hudiLogRecords = scanLog(split, 

[GitHub] [hudi] garyli1019 commented on a change in pull request #1848: [HUDI-69] Support Spark Datasource for MOR table - RDD approach

2020-07-21 Thread GitBox


garyli1019 commented on a change in pull request #1848:
URL: https://github.com/apache/hudi/pull/1848#discussion_r458242933



##
File path: hudi-spark/src/main/scala/org/apache/hudi/HudiMergeOnReadRDD.scala
##
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.table.log.{HoodieMergedLogRecordScanner, 
LogReaderUtils}
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.config.{HadoopSerializableConfiguration, 
HoodieRealtimeConfig}
+import 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS
+
+import org.apache.avro.Schema
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+case class HudiMergeOnReadPartition(index: Int, split: 
HudiMergeOnReadFileSplit) extends Partition
+
+class HudiMergeOnReadRDD(sc: SparkContext,
+ broadcastedConf: 
Broadcast[HadoopSerializableConfiguration],
+ dataReadFunction: PartitionedFile => Iterator[Any],
+ dataSchema: StructType,
+ hudiRealtimeFileSplits: 
List[HudiMergeOnReadFileSplit])
+  extends RDD[InternalRow](sc, Nil) {
+
+  // Broadcast the hadoop Configuration to executors.
+  def this(sc: SparkContext,
+   config: Configuration,
+   dataReadFunction: PartitionedFile => Iterator[Any],
+   dataSchema: StructType,
+   hudiRealtimeFileSplits: List[HudiMergeOnReadFileSplit]) = {
+this(
+  sc,
+  sc.broadcast(new HadoopSerializableConfiguration(config))
+  .asInstanceOf[Broadcast[HadoopSerializableConfiguration]],
+  dataReadFunction,
+  dataSchema,
+  hudiRealtimeFileSplits)
+  }
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
+val mergeParquetPartition = split.asInstanceOf[HudiMergeOnReadPartition]
+mergeParquetPartition.split match {
+  case dataFileOnlySplit if dataFileOnlySplit.logPaths.isEmpty =>
+read(mergeParquetPartition.split.dataFile, dataReadFunction)
+  case unMergeSplit if unMergeSplit.skipMerge =>
+unMergeFileIterator(mergeParquetPartition.split, dataReadFunction)
+  case mergeSplit if !mergeSplit.skipMerge =>
+mergeFileIterator(mergeParquetPartition.split, dataReadFunction)
+  case _ => throw new HoodieException("Unable to select an Iterator to 
read the Hudi MOR File Split")
+}
+  }
+
+  override protected def getPartitions: Array[Partition] = {
+hudiRealtimeFileSplits.zipWithIndex.map(file => 
HudiMergeOnReadPartition(file._2, file._1)).toArray
+  }
+
+  private def getConfig(): Configuration = {
+broadcastedConf.value.config
+  }
+
+  private def read(partitionedFile: PartitionedFile,
+   readFileFunction: PartitionedFile => Iterator[Any]): 
Iterator[InternalRow] = {
+val fileIterator = readFileFunction(partitionedFile)
+val rows = fileIterator.flatMap(_ match {
+  case r: InternalRow => Seq(r)
+  case b: ColumnarBatch => b.rowIterator().asScala
+})
+rows
+  }
+
+  private def unMergeFileIterator(split: HudiMergeOnReadFileSplit,
+  readFileFunction: PartitionedFile => 
Iterator[Any]): Iterator[InternalRow] =
+new Iterator[InternalRow] {
+  private val dataFileIterator = read(split.dataFile, readFileFunction)
+  private val logSchema = getLogAvroSchema(split)
+  private val sparkTypes = 
SchemaConverters.toSqlType(logSchema).dataType.asInstanceOf[StructType]
+  private val converter = new AvroDeserializer(logSchema, sparkTypes)
+  private val hudiLogRecords = scanLog(split, 

[GitHub] [hudi] garyli1019 commented on a change in pull request #1848: [HUDI-69] Support Spark Datasource for MOR table - RDD approach

2020-07-21 Thread GitBox


garyli1019 commented on a change in pull request #1848:
URL: https://github.com/apache/hudi/pull/1848#discussion_r458241891



##
File path: hudi-spark/src/main/scala/org/apache/hudi/HudiMergeOnReadRDD.scala
##
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.table.log.{HoodieMergedLogRecordScanner, 
LogReaderUtils}
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.config.{HadoopSerializableConfiguration, 
HoodieRealtimeConfig}
+import 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS
+
+import org.apache.avro.Schema
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+case class HudiMergeOnReadPartition(index: Int, split: 
HudiMergeOnReadFileSplit) extends Partition
+
+class HudiMergeOnReadRDD(sc: SparkContext,
+ broadcastedConf: 
Broadcast[HadoopSerializableConfiguration],
+ dataReadFunction: PartitionedFile => Iterator[Any],
+ dataSchema: StructType,
+ hudiRealtimeFileSplits: 
List[HudiMergeOnReadFileSplit])
+  extends RDD[InternalRow](sc, Nil) {
+
+  // Broadcast the hadoop Configuration to executors.
+  def this(sc: SparkContext,
+   config: Configuration,
+   dataReadFunction: PartitionedFile => Iterator[Any],
+   dataSchema: StructType,
+   hudiRealtimeFileSplits: List[HudiMergeOnReadFileSplit]) = {
+this(
+  sc,
+  sc.broadcast(new HadoopSerializableConfiguration(config))
+  .asInstanceOf[Broadcast[HadoopSerializableConfiguration]],
+  dataReadFunction,
+  dataSchema,
+  hudiRealtimeFileSplits)
+  }
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
+val mergeParquetPartition = split.asInstanceOf[HudiMergeOnReadPartition]
+mergeParquetPartition.split match {
+  case dataFileOnlySplit if dataFileOnlySplit.logPaths.isEmpty =>
+read(mergeParquetPartition.split.dataFile, dataReadFunction)
+  case unMergeSplit if unMergeSplit.skipMerge =>
+unMergeFileIterator(mergeParquetPartition.split, dataReadFunction)
+  case mergeSplit if !mergeSplit.skipMerge =>
+mergeFileIterator(mergeParquetPartition.split, dataReadFunction)
+  case _ => throw new HoodieException("Unable to select an Iterator to 
read the Hudi MOR File Split")
+}
+  }
+
+  override protected def getPartitions: Array[Partition] = {
+hudiRealtimeFileSplits.zipWithIndex.map(file => 
HudiMergeOnReadPartition(file._2, file._1)).toArray
+  }
+
+  private def getConfig(): Configuration = {
+broadcastedConf.value.config
+  }
+
+  private def read(partitionedFile: PartitionedFile,
+   readFileFunction: PartitionedFile => Iterator[Any]): 
Iterator[InternalRow] = {
+val fileIterator = readFileFunction(partitionedFile)
+val rows = fileIterator.flatMap(_ match {
+  case r: InternalRow => Seq(r)
+  case b: ColumnarBatch => b.rowIterator().asScala
+})
+rows
+  }
+
+  private def unMergeFileIterator(split: HudiMergeOnReadFileSplit,
+  readFileFunction: PartitionedFile => 
Iterator[Any]): Iterator[InternalRow] =
+new Iterator[InternalRow] {
+  private val dataFileIterator = read(split.dataFile, readFileFunction)
+  private val logSchema = getLogAvroSchema(split)
+  private val sparkTypes = 
SchemaConverters.toSqlType(logSchema).dataType.asInstanceOf[StructType]
+  private val converter = new AvroDeserializer(logSchema, sparkTypes)
+  private val hudiLogRecords = scanLog(split, 

[GitHub] [hudi] garyli1019 commented on a change in pull request #1848: [HUDI-69] Support Spark Datasource for MOR table - RDD approach

2020-07-21 Thread GitBox


garyli1019 commented on a change in pull request #1848:
URL: https://github.com/apache/hudi/pull/1848#discussion_r458237536



##
File path: hudi-spark/src/main/scala/org/apache/hudi/SnapshotRelation.scala
##
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils
+import 
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.JobConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, TableScan}
+import org.apache.spark.sql.types.StructType
+
+import scala.collection.JavaConverters._
+
+case class HudiMergeOnReadFileSplit(dataFile: PartitionedFile,
+logPaths: Option[List[String]],
+latestCommit: String,
+tablePath: String,
+maxCompactionMemoryInBytes: Long,
+skipMerge: Boolean)
+
+class SnapshotRelation (val sqlContext: SQLContext,
+val optParams: Map[String, String],
+val userSchema: StructType,
+val globPaths: Seq[Path],
+val metaClient: HoodieTableMetaClient)
+  extends BaseRelation with TableScan with Logging{
+
+  private val conf = sqlContext.sparkContext.hadoopConfiguration
+
+  // use schema from latest metadata, if not present, read schema from the 
data file
+  private val latestSchema = {
+val schemaUtil = new TableSchemaResolver(metaClient)
+val tableSchema = 
HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchemaWithoutMetadataFields)
+AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
+  }
+
+  private val skipMerge = optParams.getOrElse(
+DataSourceReadOptions.REALTIME_SKIP_MERGE_KEY,
+DataSourceReadOptions.DEFAULT_REALTIME_SKIP_MERGE_VAL).toBoolean
+  private val maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(new 
JobConf(conf))
+  private val fileIndex = buildFileIndex()
+
+  override def schema: StructType = latestSchema
+
+  override def needConversion: Boolean = false
+
+  override def buildScan(): RDD[Row] = {
+val parquetReaderFunction = new 
ParquetFileFormat().buildReaderWithPartitionValues(

Review comment:
   I think we can make `HoodieLogFileFormat` to read log files in the 
future. Wrapping the `FileFormat` here gives us a lot of flexibility to adopt 
other formats like ORC. Wrapping inside the FileFormat could also possible by 
`override buildReaderWithPartitionValues` and call 
`super.buildReaderWithPartitionValues` to get the iterator. The downside would 
be we probably need two separate classes to handle ORC and parquet.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] garyli1019 commented on a change in pull request #1848: [HUDI-69] Support Spark Datasource for MOR table - RDD approach

2020-07-21 Thread GitBox


garyli1019 commented on a change in pull request #1848:
URL: https://github.com/apache/hudi/pull/1848#discussion_r458232450



##
File path: hudi-spark/src/main/scala/org/apache/hudi/SnapshotRelation.scala
##
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils
+import 
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.JobConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, TableScan}
+import org.apache.spark.sql.types.StructType
+
+import scala.collection.JavaConverters._
+
+case class HudiMergeOnReadFileSplit(dataFile: PartitionedFile,
+logPaths: Option[List[String]],
+latestCommit: String,
+tablePath: String,
+maxCompactionMemoryInBytes: Long,
+skipMerge: Boolean)
+
+class SnapshotRelation (val sqlContext: SQLContext,
+val optParams: Map[String, String],
+val userSchema: StructType,
+val globPaths: Seq[Path],
+val metaClient: HoodieTableMetaClient)
+  extends BaseRelation with TableScan with Logging{
+
+  private val conf = sqlContext.sparkContext.hadoopConfiguration
+
+  // use schema from latest metadata, if not present, read schema from the 
data file
+  private val latestSchema = {
+val schemaUtil = new TableSchemaResolver(metaClient)
+val tableSchema = 
HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchemaWithoutMetadataFields)
+AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
+  }
+
+  private val skipMerge = optParams.getOrElse(
+DataSourceReadOptions.REALTIME_SKIP_MERGE_KEY,
+DataSourceReadOptions.DEFAULT_REALTIME_SKIP_MERGE_VAL).toBoolean
+  private val maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(new 
JobConf(conf))
+  private val fileIndex = buildFileIndex()
+
+  override def schema: StructType = latestSchema
+
+  override def needConversion: Boolean = false
+
+  override def buildScan(): RDD[Row] = {
+val parquetReaderFunction = new 
ParquetFileFormat().buildReaderWithPartitionValues(
+  sparkSession = sqlContext.sparkSession,
+  dataSchema = latestSchema,
+  partitionSchema = StructType(Nil),
+  requiredSchema = latestSchema,
+  filters = Seq.empty,

Review comment:
   This field required `Seq[Filter]`. With `PrunedFilteredScan` we can just 
pass whatever Spark passed to `buildScan(xxx, filter: Seq[Filter]` to here. 
This field could be an empty `Seq`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] garyli1019 commented on a change in pull request #1848: [HUDI-69] Support Spark Datasource for MOR table - RDD approach

2020-07-21 Thread GitBox


garyli1019 commented on a change in pull request #1848:
URL: https://github.com/apache/hudi/pull/1848#discussion_r458230756



##
File path: hudi-spark/src/main/scala/org/apache/hudi/SnapshotRelation.scala
##
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils
+import 
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.JobConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, TableScan}
+import org.apache.spark.sql.types.StructType
+
+import scala.collection.JavaConverters._
+
+case class HudiMergeOnReadFileSplit(dataFile: PartitionedFile,
+logPaths: Option[List[String]],
+latestCommit: String,
+tablePath: String,
+maxCompactionMemoryInBytes: Long,
+skipMerge: Boolean)
+
+class SnapshotRelation (val sqlContext: SQLContext,
+val optParams: Map[String, String],
+val userSchema: StructType,
+val globPaths: Seq[Path],
+val metaClient: HoodieTableMetaClient)
+  extends BaseRelation with TableScan with Logging{
+
+  private val conf = sqlContext.sparkContext.hadoopConfiguration
+
+  // use schema from latest metadata, if not present, read schema from the 
data file
+  private val latestSchema = {
+val schemaUtil = new TableSchemaResolver(metaClient)
+val tableSchema = 
HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchemaWithoutMetadataFields)
+AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
+  }
+
+  private val skipMerge = optParams.getOrElse(
+DataSourceReadOptions.REALTIME_SKIP_MERGE_KEY,
+DataSourceReadOptions.DEFAULT_REALTIME_SKIP_MERGE_VAL).toBoolean
+  private val maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(new 
JobConf(conf))
+  private val fileIndex = buildFileIndex()
+
+  override def schema: StructType = latestSchema
+
+  override def needConversion: Boolean = false
+
+  override def buildScan(): RDD[Row] = {
+val parquetReaderFunction = new 
ParquetFileFormat().buildReaderWithPartitionValues(
+  sparkSession = sqlContext.sparkSession,
+  dataSchema = latestSchema,
+  partitionSchema = StructType(Nil),
+  requiredSchema = latestSchema,

Review comment:
   This is related to `PrunedFilteredScan`. We need to support merging two 
records with different schemas if we don't read all fields here. This will be 
targeting for 0.6.0 release





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] garyli1019 commented on a change in pull request #1848: [HUDI-69] Support Spark Datasource for MOR table - RDD approach

2020-07-21 Thread GitBox


garyli1019 commented on a change in pull request #1848:
URL: https://github.com/apache/hudi/pull/1848#discussion_r458224048



##
File path: hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
##
@@ -110,6 +112,10 @@ object DataSourceReadOptions {
*/
   val INCR_PATH_GLOB_OPT_KEY = "hoodie.datasource.read.incr.path.glob"
   val DEFAULT_INCR_PATH_GLOB_OPT_VAL = ""
+
+
+  val REALTIME_SKIP_MERGE_KEY = REALTIME_SKIP_MERGE_PROP

Review comment:
   Agree. Maybe something like `SNAPSHOT_READ_STRATEGY`? So we can control 
the logic for `merge` `unmerge` `mergeWithBootstrap` e.t.c





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] garyli1019 commented on a change in pull request #1848: [HUDI-69] Support Spark Datasource for MOR table - RDD approach

2020-07-21 Thread GitBox


garyli1019 commented on a change in pull request #1848:
URL: https://github.com/apache/hudi/pull/1848#discussion_r458215093



##
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
##
@@ -147,12 +146,4 @@ public Schema getWriterSchema() {
   public Schema getHiveSchema() {
 return hiveSchema;
   }
-
-  public long getMaxCompactionMemoryInBytes() {

Review comment:
   Moved to a utils class. Need to call this method from 
`HoodieMergeOnReadRDD`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] garyli1019 commented on a change in pull request #1848: [HUDI-69] Support Spark Datasource for MOR table - RDD approach

2020-07-21 Thread GitBox


garyli1019 commented on a change in pull request #1848:
URL: https://github.com/apache/hudi/pull/1848#discussion_r458213831



##
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/config/HadoopSerializableConfiguration.java
##
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop.config;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+public class HadoopSerializableConfiguration implements Serializable {

Review comment:
   Didn't know this. Will switch over.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] garyli1019 commented on a change in pull request #1848: [HUDI-69] Support Spark Datasource for MOR table - RDD approach

2020-07-20 Thread GitBox


garyli1019 commented on a change in pull request #1848:
URL: https://github.com/apache/hudi/pull/1848#discussion_r457844997



##
File path: hudi-spark/src/main/scala/org/apache/hudi/SnapshotRelation.scala
##
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils
+import 
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.JobConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, TableScan}
+import org.apache.spark.sql.types.StructType
+
+import scala.collection.JavaConverters._
+
+case class HudiMergeOnReadFileSplit(dataFile: PartitionedFile,
+logPaths: Option[List[String]],
+latestCommit: String,
+tablePath: String,
+maxCompactionMemoryInBytes: Long,
+skipMerge: Boolean)
+
+class SnapshotRelation (val sqlContext: SQLContext,
+val optParams: Map[String, String],
+val userSchema: StructType,
+val globPaths: Seq[Path],
+val metaClient: HoodieTableMetaClient)
+  extends BaseRelation with TableScan with Logging{

Review comment:
   `PrunedFilteredScan` will change the behavior of `ParquetRecordReader` 
inside `ParquetFileFormat` even we are not using the vectorized reader. Still 
trying to figure out why... I will follow up with `PrunedFilteredScan` in a 
separate PR.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org