[GitHub] [hudi] garyli1019 commented on a change in pull request #1848: [HUDI-69] Support Spark Datasource for MOR table - RDD approach
garyli1019 commented on a change in pull request #1848: URL: https://github.com/apache/hudi/pull/1848#discussion_r466829856 ## File path: hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala ## @@ -132,11 +132,15 @@ class DefaultSource extends RelationProvider log.info("Constructing hoodie (as parquet) data source with options :" + optParams) // simply return as a regular parquet relation -DataSource.apply( +val relation = DataSource.apply( sparkSession = sqlContext.sparkSession, userSpecifiedSchema = Option(schema), className = "parquet", options = optParams) .resolveRelation() + + sqlContext.sparkContext.hadoopConfiguration.unset("mapreduce.input.pathFilter.class") Review comment: yes make sense. We unset this before the incremental query. When we only have two data source query type, it's fine to unset before running incremental query, but right now we have to unset this here. Still wondering why the local build is able to pass though... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] garyli1019 commented on a change in pull request #1848: [HUDI-69] Support Spark Datasource for MOR table - RDD approach
garyli1019 commented on a change in pull request #1848: URL: https://github.com/apache/hudi/pull/1848#discussion_r466167704 ## File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala ## @@ -27,9 +27,9 @@ import org.apache.spark.sql.types.{StringType, StructField, StructType} import scala.collection.JavaConverters._ -object HudiSparkUtils { +object HoodieSparkUtils { Review comment: nvm, this was intended based on previous comments to keep the consistency with others. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] garyli1019 commented on a change in pull request #1848: [HUDI-69] Support Spark Datasource for MOR table - RDD approach
garyli1019 commented on a change in pull request #1848: URL: https://github.com/apache/hudi/pull/1848#discussion_r466166224 ## File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala ## @@ -27,9 +27,9 @@ import org.apache.spark.sql.types.{StringType, StructField, StructType} import scala.collection.JavaConverters._ -object HudiSparkUtils { +object HoodieSparkUtils { Review comment: @vinothchandar Looks like some unrelated change was added during the rebase. Maybe this is related to the issue. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] garyli1019 commented on a change in pull request #1848: [HUDI-69] Support Spark Datasource for MOR table - RDD approach
garyli1019 commented on a change in pull request #1848: URL: https://github.com/apache/hudi/pull/1848#discussion_r464842847 ## File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala ## @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner +import org.apache.hudi.exception.HoodieException +import org.apache.hudi.hadoop.config.HoodieRealtimeConfig +import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS + +import org.apache.avro.Schema +import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder} +import org.apache.hadoop.conf.Configuration +import org.apache.spark.{Partition, SerializableWritable, SparkContext, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.avro.{AvroDeserializer, AvroSerializer} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.vectorized.ColumnarBatch + +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.util.Try + +case class HoodieMergeOnReadPartition(index: Int, split: HoodieMergeOnReadFileSplit) extends Partition + +class HoodieMergeOnReadRDD(@transient sc: SparkContext, + @transient config: Configuration, + fullSchemaFileReader: PartitionedFile => Iterator[Any], + requiredSchemaFileReader: PartitionedFile => Iterator[Any], + tableState: HoodieMergeOnReadTableState) + extends RDD[InternalRow](sc, Nil) { + + private val confBroadcast = sc.broadcast(new SerializableWritable(config)) + + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { +val mergeParquetPartition = split.asInstanceOf[HoodieMergeOnReadPartition] +mergeParquetPartition.split match { + case dataFileOnlySplit if dataFileOnlySplit.logPaths.isEmpty => +read(mergeParquetPartition.split.dataFile, requiredSchemaFileReader) + case skipMergeSplit if skipMergeSplit.mergeType +.equals(DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL) => +skipMergeFileIterator( + skipMergeSplit, + read(mergeParquetPartition.split.dataFile, requiredSchemaFileReader), + getConfig +) + case payloadCombineSplit if payloadCombineSplit.mergeType +.equals(DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL) => +payloadCombineFileIterator( + payloadCombineSplit, + read(mergeParquetPartition.split.dataFile, fullSchemaFileReader), Review comment: Yes there is still some room for improvement. Spark schema is tricky, when it passed the required columns into the `PrunedFilterScan`, the order of the columns are different from the actual schema, and it will get reorder in somewhere else before return to the user. The projected InternalRow and the projected Avro record will have a different order of schema. I will look into this but not sure if I am able to get it into this release. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] garyli1019 commented on a change in pull request #1848: [HUDI-69] Support Spark Datasource for MOR table - RDD approach
garyli1019 commented on a change in pull request #1848: URL: https://github.com/apache/hudi/pull/1848#discussion_r459864183 ## File path: hudi-spark/src/main/scala/org/apache/hudi/SnapshotRelation.scala ## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.hudi.avro.HoodieAvroUtils +import org.apache.hudi.common.model.HoodieBaseFile +import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.common.table.view.HoodieTableFileSystemView +import org.apache.hudi.exception.HoodieException +import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils +import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapred.JobConf +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.sources.{BaseRelation, TableScan} +import org.apache.spark.sql.types.StructType + +import scala.collection.JavaConverters._ + +case class HudiMergeOnReadFileSplit(dataFile: PartitionedFile, +logPaths: Option[List[String]], +latestCommit: String, +tablePath: String, +maxCompactionMemoryInBytes: Long, +skipMerge: Boolean) + +class SnapshotRelation (val sqlContext: SQLContext, +val optParams: Map[String, String], +val userSchema: StructType, +val globPaths: Seq[Path], +val metaClient: HoodieTableMetaClient) + extends BaseRelation with TableScan with Logging{ + + private val conf = sqlContext.sparkContext.hadoopConfiguration + + // use schema from latest metadata, if not present, read schema from the data file + private val latestSchema = { +val schemaUtil = new TableSchemaResolver(metaClient) +val tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchemaWithoutMetadataFields) +AvroConversionUtils.convertAvroSchemaToStructType(tableSchema) + } + + private val skipMerge = optParams.getOrElse( +DataSourceReadOptions.REALTIME_SKIP_MERGE_KEY, +DataSourceReadOptions.DEFAULT_REALTIME_SKIP_MERGE_VAL).toBoolean + private val maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(new JobConf(conf)) + private val fileIndex = buildFileIndex() + + override def schema: StructType = latestSchema + + override def needConversion: Boolean = false + + override def buildScan(): RDD[Row] = { +val parquetReaderFunction = new ParquetFileFormat().buildReaderWithPartitionValues( + sparkSession = sqlContext.sparkSession, + dataSchema = latestSchema, + partitionSchema = StructType(Nil), + requiredSchema = latestSchema, Review comment: After thinking for a while, I think we can handle it this way: BaseFileOnly use the user-specified schema base file reader Unmerge use the user-specified schema base file reader Convert log record to `InternalRow` then extract the correct schema before exiting the `unMergeFileIterator` Or the other way around. Merge Use the full schema base file reader. Merge two records in Avro. Convert to InternalRow then extract the correct schema or the other way around. Since the `InternalRow` need the position to extract the value and the schema could be nested. This could get complicated once nested columns got involved. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] garyli1019 commented on a change in pull request #1848: [HUDI-69] Support Spark Datasource for MOR table - RDD approach
garyli1019 commented on a change in pull request #1848: URL: https://github.com/apache/hudi/pull/1848#discussion_r459742568 ## File path: hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala ## @@ -58,26 +61,20 @@ class DefaultSource extends RelationProvider throw new HoodieException("'path' must be specified.") } +val fs = FSUtils.getFs(path.get, sqlContext.sparkContext.hadoopConfiguration) +val globPaths = HudiSparkUtils.checkAndGlobPathIfNecessary(Seq(path.get), fs) +val tablePath = DataSourceUtils.getTablePath(fs, globPaths.toArray) +val metaClient = new HoodieTableMetaClient(fs.getConf, tablePath) Review comment: I think I can move this into the SNAPSHOT_QUERY session. ## File path: hudi-spark/src/main/scala/org/apache/hudi/SnapshotRelation.scala ## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.hudi.avro.HoodieAvroUtils +import org.apache.hudi.common.model.HoodieBaseFile +import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.common.table.view.HoodieTableFileSystemView +import org.apache.hudi.exception.HoodieException +import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils +import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapred.JobConf +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.sources.{BaseRelation, TableScan} +import org.apache.spark.sql.types.StructType + +import scala.collection.JavaConverters._ + +case class HudiMergeOnReadFileSplit(dataFile: PartitionedFile, +logPaths: Option[List[String]], +latestCommit: String, +tablePath: String, +maxCompactionMemoryInBytes: Long, +skipMerge: Boolean) + +class SnapshotRelation (val sqlContext: SQLContext, +val optParams: Map[String, String], +val userSchema: StructType, +val globPaths: Seq[Path], +val metaClient: HoodieTableMetaClient) + extends BaseRelation with TableScan with Logging{ + + private val conf = sqlContext.sparkContext.hadoopConfiguration + + // use schema from latest metadata, if not present, read schema from the data file + private val latestSchema = { +val schemaUtil = new TableSchemaResolver(metaClient) +val tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchemaWithoutMetadataFields) +AvroConversionUtils.convertAvroSchemaToStructType(tableSchema) + } + + private val skipMerge = optParams.getOrElse( +DataSourceReadOptions.REALTIME_SKIP_MERGE_KEY, +DataSourceReadOptions.DEFAULT_REALTIME_SKIP_MERGE_VAL).toBoolean + private val maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(new JobConf(conf)) + private val fileIndex = buildFileIndex() + + override def schema: StructType = latestSchema + + override def needConversion: Boolean = false + + override def buildScan(): RDD[Row] = { +val parquetReaderFunction = new ParquetFileFormat().buildReaderWithPartitionValues( + sparkSession = sqlContext.sparkSession, + dataSchema = latestSchema, + partitionSchema = StructType(Nil), + requiredSchema = latestSchema, Review comment: When I pushdown nothing and pass the full schema as user requested schema, with simply changing from `TableScan()` to `PrunedFilteredScan`, the behavior of the parquet reader was changed and not reading the correct schema. I need to dig deeper here. Let's focus on making the basic functionality work in this PR. I will figure this out with a follow-up PR. ## File p
[GitHub] [hudi] garyli1019 commented on a change in pull request #1848: [HUDI-69] Support Spark Datasource for MOR table - RDD approach
garyli1019 commented on a change in pull request #1848: URL: https://github.com/apache/hudi/pull/1848#discussion_r459230287 ## File path: hudi-spark/src/main/scala/org/apache/hudi/HudiMergeOnReadRDD.scala ## @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.table.log.{HoodieMergedLogRecordScanner, LogReaderUtils} +import org.apache.hudi.exception.HoodieException +import org.apache.hudi.hadoop.config.{HadoopSerializableConfiguration, HoodieRealtimeConfig} +import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS + +import org.apache.avro.Schema +import org.apache.hadoop.conf.Configuration +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.vectorized.ColumnarBatch + +import scala.collection.JavaConverters._ +import scala.util.Try + +case class HudiMergeOnReadPartition(index: Int, split: HudiMergeOnReadFileSplit) extends Partition + +class HudiMergeOnReadRDD(sc: SparkContext, + broadcastedConf: Broadcast[HadoopSerializableConfiguration], + dataReadFunction: PartitionedFile => Iterator[Any], + dataSchema: StructType, + hudiRealtimeFileSplits: List[HudiMergeOnReadFileSplit]) + extends RDD[InternalRow](sc, Nil) { + + // Broadcast the hadoop Configuration to executors. + def this(sc: SparkContext, + config: Configuration, + dataReadFunction: PartitionedFile => Iterator[Any], + dataSchema: StructType, + hudiRealtimeFileSplits: List[HudiMergeOnReadFileSplit]) = { +this( + sc, + sc.broadcast(new HadoopSerializableConfiguration(config)) + .asInstanceOf[Broadcast[HadoopSerializableConfiguration]], + dataReadFunction, + dataSchema, + hudiRealtimeFileSplits) + } + + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { +val mergeParquetPartition = split.asInstanceOf[HudiMergeOnReadPartition] +mergeParquetPartition.split match { + case dataFileOnlySplit if dataFileOnlySplit.logPaths.isEmpty => +read(mergeParquetPartition.split.dataFile, dataReadFunction) + case unMergeSplit if unMergeSplit.skipMerge => +unMergeFileIterator(mergeParquetPartition.split, dataReadFunction) + case mergeSplit if !mergeSplit.skipMerge => +mergeFileIterator(mergeParquetPartition.split, dataReadFunction) + case _ => throw new HoodieException("Unable to select an Iterator to read the Hudi MOR File Split") +} + } + + override protected def getPartitions: Array[Partition] = { +hudiRealtimeFileSplits.zipWithIndex.map(file => HudiMergeOnReadPartition(file._2, file._1)).toArray + } + + private def getConfig(): Configuration = { +broadcastedConf.value.config + } + + private def read(partitionedFile: PartitionedFile, + readFileFunction: PartitionedFile => Iterator[Any]): Iterator[InternalRow] = { +val fileIterator = readFileFunction(partitionedFile) +val rows = fileIterator.flatMap(_ match { + case r: InternalRow => Seq(r) + case b: ColumnarBatch => b.rowIterator().asScala +}) +rows + } + + private def unMergeFileIterator(split: HudiMergeOnReadFileSplit, + readFileFunction: PartitionedFile => Iterator[Any]): Iterator[InternalRow] = +new Iterator[InternalRow] { + private val dataFileIterator = read(split.dataFile, readFileFunction) + private val logSchema = getLogAvroSchema(split) + private val sparkTypes = SchemaConverters.toSqlType(logSchema).dataType.asInstanceOf[StructType] + private val converter = new AvroDeserializer(logSchema, sparkTypes) + private val hudiLogRecords = scanLog(split,
[GitHub] [hudi] garyli1019 commented on a change in pull request #1848: [HUDI-69] Support Spark Datasource for MOR table - RDD approach
garyli1019 commented on a change in pull request #1848: URL: https://github.com/apache/hudi/pull/1848#discussion_r458544915 ## File path: hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala ## @@ -110,6 +112,10 @@ object DataSourceReadOptions { */ val INCR_PATH_GLOB_OPT_KEY = "hoodie.datasource.read.incr.path.glob" val DEFAULT_INCR_PATH_GLOB_OPT_VAL = "" + + + val REALTIME_SKIP_MERGE_KEY = REALTIME_SKIP_MERGE_PROP Review comment: added `MERGE_ON_READ_PAYLOAD_KEY` and `MERGE_ON_READ_ORDERING_KEY`. Then we use the payload to do all the merging. ## File path: hudi-spark/src/main/scala/org/apache/hudi/SnapshotRelation.scala ## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.hudi.avro.HoodieAvroUtils +import org.apache.hudi.common.model.HoodieBaseFile +import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.common.table.view.HoodieTableFileSystemView +import org.apache.hudi.exception.HoodieException +import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils +import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapred.JobConf +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.sources.{BaseRelation, TableScan} +import org.apache.spark.sql.types.StructType + +import scala.collection.JavaConverters._ + +case class HudiMergeOnReadFileSplit(dataFile: PartitionedFile, +logPaths: Option[List[String]], +latestCommit: String, +tablePath: String, +maxCompactionMemoryInBytes: Long, +skipMerge: Boolean) + +class SnapshotRelation (val sqlContext: SQLContext, +val optParams: Map[String, String], +val userSchema: StructType, +val globPaths: Seq[Path], +val metaClient: HoodieTableMetaClient) + extends BaseRelation with TableScan with Logging{ Review comment: https://issues.apache.org/jira/browse/HUDI-1050 ## File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java ## @@ -40,6 +40,8 @@ import java.io.IOException; import java.util.Map; +import static org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes; Review comment: done ## File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/config/HadoopSerializableConfiguration.java ## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hadoop.config; + +import org.apache.hadoop.conf.Configuration; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; + +public class HadoopSerializableConfiguration implements Serializable { Review comment: d
[GitHub] [hudi] garyli1019 commented on a change in pull request #1848: [HUDI-69] Support Spark Datasource for MOR table - RDD approach
garyli1019 commented on a change in pull request #1848: URL: https://github.com/apache/hudi/pull/1848#discussion_r458247881 ## File path: hudi-spark/src/main/scala/org/apache/hudi/HudiMergeOnReadRDD.scala ## @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.table.log.{HoodieMergedLogRecordScanner, LogReaderUtils} +import org.apache.hudi.exception.HoodieException +import org.apache.hudi.hadoop.config.{HadoopSerializableConfiguration, HoodieRealtimeConfig} +import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS + +import org.apache.avro.Schema +import org.apache.hadoop.conf.Configuration +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.vectorized.ColumnarBatch + +import scala.collection.JavaConverters._ +import scala.util.Try + +case class HudiMergeOnReadPartition(index: Int, split: HudiMergeOnReadFileSplit) extends Partition + +class HudiMergeOnReadRDD(sc: SparkContext, + broadcastedConf: Broadcast[HadoopSerializableConfiguration], + dataReadFunction: PartitionedFile => Iterator[Any], + dataSchema: StructType, + hudiRealtimeFileSplits: List[HudiMergeOnReadFileSplit]) + extends RDD[InternalRow](sc, Nil) { + + // Broadcast the hadoop Configuration to executors. + def this(sc: SparkContext, + config: Configuration, + dataReadFunction: PartitionedFile => Iterator[Any], + dataSchema: StructType, + hudiRealtimeFileSplits: List[HudiMergeOnReadFileSplit]) = { +this( + sc, + sc.broadcast(new HadoopSerializableConfiguration(config)) + .asInstanceOf[Broadcast[HadoopSerializableConfiguration]], + dataReadFunction, + dataSchema, + hudiRealtimeFileSplits) + } + + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { +val mergeParquetPartition = split.asInstanceOf[HudiMergeOnReadPartition] +mergeParquetPartition.split match { + case dataFileOnlySplit if dataFileOnlySplit.logPaths.isEmpty => +read(mergeParquetPartition.split.dataFile, dataReadFunction) + case unMergeSplit if unMergeSplit.skipMerge => +unMergeFileIterator(mergeParquetPartition.split, dataReadFunction) + case mergeSplit if !mergeSplit.skipMerge => +mergeFileIterator(mergeParquetPartition.split, dataReadFunction) + case _ => throw new HoodieException("Unable to select an Iterator to read the Hudi MOR File Split") +} + } + + override protected def getPartitions: Array[Partition] = { +hudiRealtimeFileSplits.zipWithIndex.map(file => HudiMergeOnReadPartition(file._2, file._1)).toArray + } + + private def getConfig(): Configuration = { +broadcastedConf.value.config + } + + private def read(partitionedFile: PartitionedFile, + readFileFunction: PartitionedFile => Iterator[Any]): Iterator[InternalRow] = { +val fileIterator = readFileFunction(partitionedFile) +val rows = fileIterator.flatMap(_ match { + case r: InternalRow => Seq(r) + case b: ColumnarBatch => b.rowIterator().asScala +}) +rows + } + + private def unMergeFileIterator(split: HudiMergeOnReadFileSplit, + readFileFunction: PartitionedFile => Iterator[Any]): Iterator[InternalRow] = +new Iterator[InternalRow] { + private val dataFileIterator = read(split.dataFile, readFileFunction) + private val logSchema = getLogAvroSchema(split) + private val sparkTypes = SchemaConverters.toSqlType(logSchema).dataType.asInstanceOf[StructType] + private val converter = new AvroDeserializer(logSchema, sparkTypes) + private val hudiLogRecords = scanLog(split,
[GitHub] [hudi] garyli1019 commented on a change in pull request #1848: [HUDI-69] Support Spark Datasource for MOR table - RDD approach
garyli1019 commented on a change in pull request #1848: URL: https://github.com/apache/hudi/pull/1848#discussion_r458245661 ## File path: hudi-spark/src/main/scala/org/apache/hudi/HudiMergeOnReadRDD.scala ## @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.table.log.{HoodieMergedLogRecordScanner, LogReaderUtils} +import org.apache.hudi.exception.HoodieException +import org.apache.hudi.hadoop.config.{HadoopSerializableConfiguration, HoodieRealtimeConfig} +import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS + +import org.apache.avro.Schema +import org.apache.hadoop.conf.Configuration +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.vectorized.ColumnarBatch + +import scala.collection.JavaConverters._ +import scala.util.Try + +case class HudiMergeOnReadPartition(index: Int, split: HudiMergeOnReadFileSplit) extends Partition + +class HudiMergeOnReadRDD(sc: SparkContext, + broadcastedConf: Broadcast[HadoopSerializableConfiguration], + dataReadFunction: PartitionedFile => Iterator[Any], + dataSchema: StructType, + hudiRealtimeFileSplits: List[HudiMergeOnReadFileSplit]) + extends RDD[InternalRow](sc, Nil) { + + // Broadcast the hadoop Configuration to executors. + def this(sc: SparkContext, + config: Configuration, + dataReadFunction: PartitionedFile => Iterator[Any], + dataSchema: StructType, + hudiRealtimeFileSplits: List[HudiMergeOnReadFileSplit]) = { +this( + sc, + sc.broadcast(new HadoopSerializableConfiguration(config)) + .asInstanceOf[Broadcast[HadoopSerializableConfiguration]], + dataReadFunction, + dataSchema, + hudiRealtimeFileSplits) + } + + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { +val mergeParquetPartition = split.asInstanceOf[HudiMergeOnReadPartition] +mergeParquetPartition.split match { + case dataFileOnlySplit if dataFileOnlySplit.logPaths.isEmpty => +read(mergeParquetPartition.split.dataFile, dataReadFunction) + case unMergeSplit if unMergeSplit.skipMerge => +unMergeFileIterator(mergeParquetPartition.split, dataReadFunction) + case mergeSplit if !mergeSplit.skipMerge => +mergeFileIterator(mergeParquetPartition.split, dataReadFunction) + case _ => throw new HoodieException("Unable to select an Iterator to read the Hudi MOR File Split") +} + } + + override protected def getPartitions: Array[Partition] = { +hudiRealtimeFileSplits.zipWithIndex.map(file => HudiMergeOnReadPartition(file._2, file._1)).toArray + } + + private def getConfig(): Configuration = { +broadcastedConf.value.config + } + + private def read(partitionedFile: PartitionedFile, + readFileFunction: PartitionedFile => Iterator[Any]): Iterator[InternalRow] = { +val fileIterator = readFileFunction(partitionedFile) +val rows = fileIterator.flatMap(_ match { + case r: InternalRow => Seq(r) + case b: ColumnarBatch => b.rowIterator().asScala +}) +rows + } + + private def unMergeFileIterator(split: HudiMergeOnReadFileSplit, + readFileFunction: PartitionedFile => Iterator[Any]): Iterator[InternalRow] = +new Iterator[InternalRow] { + private val dataFileIterator = read(split.dataFile, readFileFunction) + private val logSchema = getLogAvroSchema(split) + private val sparkTypes = SchemaConverters.toSqlType(logSchema).dataType.asInstanceOf[StructType] + private val converter = new AvroDeserializer(logSchema, sparkTypes) + private val hudiLogRecords = scanLog(split,
[GitHub] [hudi] garyli1019 commented on a change in pull request #1848: [HUDI-69] Support Spark Datasource for MOR table - RDD approach
garyli1019 commented on a change in pull request #1848: URL: https://github.com/apache/hudi/pull/1848#discussion_r458245661 ## File path: hudi-spark/src/main/scala/org/apache/hudi/HudiMergeOnReadRDD.scala ## @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.table.log.{HoodieMergedLogRecordScanner, LogReaderUtils} +import org.apache.hudi.exception.HoodieException +import org.apache.hudi.hadoop.config.{HadoopSerializableConfiguration, HoodieRealtimeConfig} +import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS + +import org.apache.avro.Schema +import org.apache.hadoop.conf.Configuration +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.vectorized.ColumnarBatch + +import scala.collection.JavaConverters._ +import scala.util.Try + +case class HudiMergeOnReadPartition(index: Int, split: HudiMergeOnReadFileSplit) extends Partition + +class HudiMergeOnReadRDD(sc: SparkContext, + broadcastedConf: Broadcast[HadoopSerializableConfiguration], + dataReadFunction: PartitionedFile => Iterator[Any], + dataSchema: StructType, + hudiRealtimeFileSplits: List[HudiMergeOnReadFileSplit]) + extends RDD[InternalRow](sc, Nil) { + + // Broadcast the hadoop Configuration to executors. + def this(sc: SparkContext, + config: Configuration, + dataReadFunction: PartitionedFile => Iterator[Any], + dataSchema: StructType, + hudiRealtimeFileSplits: List[HudiMergeOnReadFileSplit]) = { +this( + sc, + sc.broadcast(new HadoopSerializableConfiguration(config)) + .asInstanceOf[Broadcast[HadoopSerializableConfiguration]], + dataReadFunction, + dataSchema, + hudiRealtimeFileSplits) + } + + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { +val mergeParquetPartition = split.asInstanceOf[HudiMergeOnReadPartition] +mergeParquetPartition.split match { + case dataFileOnlySplit if dataFileOnlySplit.logPaths.isEmpty => +read(mergeParquetPartition.split.dataFile, dataReadFunction) + case unMergeSplit if unMergeSplit.skipMerge => +unMergeFileIterator(mergeParquetPartition.split, dataReadFunction) + case mergeSplit if !mergeSplit.skipMerge => +mergeFileIterator(mergeParquetPartition.split, dataReadFunction) + case _ => throw new HoodieException("Unable to select an Iterator to read the Hudi MOR File Split") +} + } + + override protected def getPartitions: Array[Partition] = { +hudiRealtimeFileSplits.zipWithIndex.map(file => HudiMergeOnReadPartition(file._2, file._1)).toArray + } + + private def getConfig(): Configuration = { +broadcastedConf.value.config + } + + private def read(partitionedFile: PartitionedFile, + readFileFunction: PartitionedFile => Iterator[Any]): Iterator[InternalRow] = { +val fileIterator = readFileFunction(partitionedFile) +val rows = fileIterator.flatMap(_ match { + case r: InternalRow => Seq(r) + case b: ColumnarBatch => b.rowIterator().asScala +}) +rows + } + + private def unMergeFileIterator(split: HudiMergeOnReadFileSplit, + readFileFunction: PartitionedFile => Iterator[Any]): Iterator[InternalRow] = +new Iterator[InternalRow] { + private val dataFileIterator = read(split.dataFile, readFileFunction) + private val logSchema = getLogAvroSchema(split) + private val sparkTypes = SchemaConverters.toSqlType(logSchema).dataType.asInstanceOf[StructType] + private val converter = new AvroDeserializer(logSchema, sparkTypes) + private val hudiLogRecords = scanLog(split,
[GitHub] [hudi] garyli1019 commented on a change in pull request #1848: [HUDI-69] Support Spark Datasource for MOR table - RDD approach
garyli1019 commented on a change in pull request #1848: URL: https://github.com/apache/hudi/pull/1848#discussion_r458242933 ## File path: hudi-spark/src/main/scala/org/apache/hudi/HudiMergeOnReadRDD.scala ## @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.table.log.{HoodieMergedLogRecordScanner, LogReaderUtils} +import org.apache.hudi.exception.HoodieException +import org.apache.hudi.hadoop.config.{HadoopSerializableConfiguration, HoodieRealtimeConfig} +import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS + +import org.apache.avro.Schema +import org.apache.hadoop.conf.Configuration +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.vectorized.ColumnarBatch + +import scala.collection.JavaConverters._ +import scala.util.Try + +case class HudiMergeOnReadPartition(index: Int, split: HudiMergeOnReadFileSplit) extends Partition + +class HudiMergeOnReadRDD(sc: SparkContext, + broadcastedConf: Broadcast[HadoopSerializableConfiguration], + dataReadFunction: PartitionedFile => Iterator[Any], + dataSchema: StructType, + hudiRealtimeFileSplits: List[HudiMergeOnReadFileSplit]) + extends RDD[InternalRow](sc, Nil) { + + // Broadcast the hadoop Configuration to executors. + def this(sc: SparkContext, + config: Configuration, + dataReadFunction: PartitionedFile => Iterator[Any], + dataSchema: StructType, + hudiRealtimeFileSplits: List[HudiMergeOnReadFileSplit]) = { +this( + sc, + sc.broadcast(new HadoopSerializableConfiguration(config)) + .asInstanceOf[Broadcast[HadoopSerializableConfiguration]], + dataReadFunction, + dataSchema, + hudiRealtimeFileSplits) + } + + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { +val mergeParquetPartition = split.asInstanceOf[HudiMergeOnReadPartition] +mergeParquetPartition.split match { + case dataFileOnlySplit if dataFileOnlySplit.logPaths.isEmpty => +read(mergeParquetPartition.split.dataFile, dataReadFunction) + case unMergeSplit if unMergeSplit.skipMerge => +unMergeFileIterator(mergeParquetPartition.split, dataReadFunction) + case mergeSplit if !mergeSplit.skipMerge => +mergeFileIterator(mergeParquetPartition.split, dataReadFunction) + case _ => throw new HoodieException("Unable to select an Iterator to read the Hudi MOR File Split") +} + } + + override protected def getPartitions: Array[Partition] = { +hudiRealtimeFileSplits.zipWithIndex.map(file => HudiMergeOnReadPartition(file._2, file._1)).toArray + } + + private def getConfig(): Configuration = { +broadcastedConf.value.config + } + + private def read(partitionedFile: PartitionedFile, + readFileFunction: PartitionedFile => Iterator[Any]): Iterator[InternalRow] = { +val fileIterator = readFileFunction(partitionedFile) +val rows = fileIterator.flatMap(_ match { + case r: InternalRow => Seq(r) + case b: ColumnarBatch => b.rowIterator().asScala +}) +rows + } + + private def unMergeFileIterator(split: HudiMergeOnReadFileSplit, + readFileFunction: PartitionedFile => Iterator[Any]): Iterator[InternalRow] = +new Iterator[InternalRow] { + private val dataFileIterator = read(split.dataFile, readFileFunction) + private val logSchema = getLogAvroSchema(split) + private val sparkTypes = SchemaConverters.toSqlType(logSchema).dataType.asInstanceOf[StructType] + private val converter = new AvroDeserializer(logSchema, sparkTypes) + private val hudiLogRecords = scanLog(split,
[GitHub] [hudi] garyli1019 commented on a change in pull request #1848: [HUDI-69] Support Spark Datasource for MOR table - RDD approach
garyli1019 commented on a change in pull request #1848: URL: https://github.com/apache/hudi/pull/1848#discussion_r458241891 ## File path: hudi-spark/src/main/scala/org/apache/hudi/HudiMergeOnReadRDD.scala ## @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.table.log.{HoodieMergedLogRecordScanner, LogReaderUtils} +import org.apache.hudi.exception.HoodieException +import org.apache.hudi.hadoop.config.{HadoopSerializableConfiguration, HoodieRealtimeConfig} +import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS + +import org.apache.avro.Schema +import org.apache.hadoop.conf.Configuration +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.vectorized.ColumnarBatch + +import scala.collection.JavaConverters._ +import scala.util.Try + +case class HudiMergeOnReadPartition(index: Int, split: HudiMergeOnReadFileSplit) extends Partition + +class HudiMergeOnReadRDD(sc: SparkContext, + broadcastedConf: Broadcast[HadoopSerializableConfiguration], + dataReadFunction: PartitionedFile => Iterator[Any], + dataSchema: StructType, + hudiRealtimeFileSplits: List[HudiMergeOnReadFileSplit]) + extends RDD[InternalRow](sc, Nil) { + + // Broadcast the hadoop Configuration to executors. + def this(sc: SparkContext, + config: Configuration, + dataReadFunction: PartitionedFile => Iterator[Any], + dataSchema: StructType, + hudiRealtimeFileSplits: List[HudiMergeOnReadFileSplit]) = { +this( + sc, + sc.broadcast(new HadoopSerializableConfiguration(config)) + .asInstanceOf[Broadcast[HadoopSerializableConfiguration]], + dataReadFunction, + dataSchema, + hudiRealtimeFileSplits) + } + + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { +val mergeParquetPartition = split.asInstanceOf[HudiMergeOnReadPartition] +mergeParquetPartition.split match { + case dataFileOnlySplit if dataFileOnlySplit.logPaths.isEmpty => +read(mergeParquetPartition.split.dataFile, dataReadFunction) + case unMergeSplit if unMergeSplit.skipMerge => +unMergeFileIterator(mergeParquetPartition.split, dataReadFunction) + case mergeSplit if !mergeSplit.skipMerge => +mergeFileIterator(mergeParquetPartition.split, dataReadFunction) + case _ => throw new HoodieException("Unable to select an Iterator to read the Hudi MOR File Split") +} + } + + override protected def getPartitions: Array[Partition] = { +hudiRealtimeFileSplits.zipWithIndex.map(file => HudiMergeOnReadPartition(file._2, file._1)).toArray + } + + private def getConfig(): Configuration = { +broadcastedConf.value.config + } + + private def read(partitionedFile: PartitionedFile, + readFileFunction: PartitionedFile => Iterator[Any]): Iterator[InternalRow] = { +val fileIterator = readFileFunction(partitionedFile) +val rows = fileIterator.flatMap(_ match { + case r: InternalRow => Seq(r) + case b: ColumnarBatch => b.rowIterator().asScala +}) +rows + } + + private def unMergeFileIterator(split: HudiMergeOnReadFileSplit, + readFileFunction: PartitionedFile => Iterator[Any]): Iterator[InternalRow] = +new Iterator[InternalRow] { + private val dataFileIterator = read(split.dataFile, readFileFunction) + private val logSchema = getLogAvroSchema(split) + private val sparkTypes = SchemaConverters.toSqlType(logSchema).dataType.asInstanceOf[StructType] + private val converter = new AvroDeserializer(logSchema, sparkTypes) + private val hudiLogRecords = scanLog(split,
[GitHub] [hudi] garyli1019 commented on a change in pull request #1848: [HUDI-69] Support Spark Datasource for MOR table - RDD approach
garyli1019 commented on a change in pull request #1848: URL: https://github.com/apache/hudi/pull/1848#discussion_r458237536 ## File path: hudi-spark/src/main/scala/org/apache/hudi/SnapshotRelation.scala ## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.hudi.avro.HoodieAvroUtils +import org.apache.hudi.common.model.HoodieBaseFile +import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.common.table.view.HoodieTableFileSystemView +import org.apache.hudi.exception.HoodieException +import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils +import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapred.JobConf +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.sources.{BaseRelation, TableScan} +import org.apache.spark.sql.types.StructType + +import scala.collection.JavaConverters._ + +case class HudiMergeOnReadFileSplit(dataFile: PartitionedFile, +logPaths: Option[List[String]], +latestCommit: String, +tablePath: String, +maxCompactionMemoryInBytes: Long, +skipMerge: Boolean) + +class SnapshotRelation (val sqlContext: SQLContext, +val optParams: Map[String, String], +val userSchema: StructType, +val globPaths: Seq[Path], +val metaClient: HoodieTableMetaClient) + extends BaseRelation with TableScan with Logging{ + + private val conf = sqlContext.sparkContext.hadoopConfiguration + + // use schema from latest metadata, if not present, read schema from the data file + private val latestSchema = { +val schemaUtil = new TableSchemaResolver(metaClient) +val tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchemaWithoutMetadataFields) +AvroConversionUtils.convertAvroSchemaToStructType(tableSchema) + } + + private val skipMerge = optParams.getOrElse( +DataSourceReadOptions.REALTIME_SKIP_MERGE_KEY, +DataSourceReadOptions.DEFAULT_REALTIME_SKIP_MERGE_VAL).toBoolean + private val maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(new JobConf(conf)) + private val fileIndex = buildFileIndex() + + override def schema: StructType = latestSchema + + override def needConversion: Boolean = false + + override def buildScan(): RDD[Row] = { +val parquetReaderFunction = new ParquetFileFormat().buildReaderWithPartitionValues( Review comment: I think we can make `HoodieLogFileFormat` to read log files in the future. Wrapping the `FileFormat` here gives us a lot of flexibility to adopt other formats like ORC. Wrapping inside the FileFormat could also possible by `override buildReaderWithPartitionValues` and call `super.buildReaderWithPartitionValues` to get the iterator. The downside would be we probably need two separate classes to handle ORC and parquet. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] garyli1019 commented on a change in pull request #1848: [HUDI-69] Support Spark Datasource for MOR table - RDD approach
garyli1019 commented on a change in pull request #1848: URL: https://github.com/apache/hudi/pull/1848#discussion_r458232450 ## File path: hudi-spark/src/main/scala/org/apache/hudi/SnapshotRelation.scala ## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.hudi.avro.HoodieAvroUtils +import org.apache.hudi.common.model.HoodieBaseFile +import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.common.table.view.HoodieTableFileSystemView +import org.apache.hudi.exception.HoodieException +import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils +import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapred.JobConf +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.sources.{BaseRelation, TableScan} +import org.apache.spark.sql.types.StructType + +import scala.collection.JavaConverters._ + +case class HudiMergeOnReadFileSplit(dataFile: PartitionedFile, +logPaths: Option[List[String]], +latestCommit: String, +tablePath: String, +maxCompactionMemoryInBytes: Long, +skipMerge: Boolean) + +class SnapshotRelation (val sqlContext: SQLContext, +val optParams: Map[String, String], +val userSchema: StructType, +val globPaths: Seq[Path], +val metaClient: HoodieTableMetaClient) + extends BaseRelation with TableScan with Logging{ + + private val conf = sqlContext.sparkContext.hadoopConfiguration + + // use schema from latest metadata, if not present, read schema from the data file + private val latestSchema = { +val schemaUtil = new TableSchemaResolver(metaClient) +val tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchemaWithoutMetadataFields) +AvroConversionUtils.convertAvroSchemaToStructType(tableSchema) + } + + private val skipMerge = optParams.getOrElse( +DataSourceReadOptions.REALTIME_SKIP_MERGE_KEY, +DataSourceReadOptions.DEFAULT_REALTIME_SKIP_MERGE_VAL).toBoolean + private val maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(new JobConf(conf)) + private val fileIndex = buildFileIndex() + + override def schema: StructType = latestSchema + + override def needConversion: Boolean = false + + override def buildScan(): RDD[Row] = { +val parquetReaderFunction = new ParquetFileFormat().buildReaderWithPartitionValues( + sparkSession = sqlContext.sparkSession, + dataSchema = latestSchema, + partitionSchema = StructType(Nil), + requiredSchema = latestSchema, + filters = Seq.empty, Review comment: This field required `Seq[Filter]`. With `PrunedFilteredScan` we can just pass whatever Spark passed to `buildScan(xxx, filter: Seq[Filter]` to here. This field could be an empty `Seq`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] garyli1019 commented on a change in pull request #1848: [HUDI-69] Support Spark Datasource for MOR table - RDD approach
garyli1019 commented on a change in pull request #1848: URL: https://github.com/apache/hudi/pull/1848#discussion_r458230756 ## File path: hudi-spark/src/main/scala/org/apache/hudi/SnapshotRelation.scala ## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.hudi.avro.HoodieAvroUtils +import org.apache.hudi.common.model.HoodieBaseFile +import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.common.table.view.HoodieTableFileSystemView +import org.apache.hudi.exception.HoodieException +import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils +import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapred.JobConf +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.sources.{BaseRelation, TableScan} +import org.apache.spark.sql.types.StructType + +import scala.collection.JavaConverters._ + +case class HudiMergeOnReadFileSplit(dataFile: PartitionedFile, +logPaths: Option[List[String]], +latestCommit: String, +tablePath: String, +maxCompactionMemoryInBytes: Long, +skipMerge: Boolean) + +class SnapshotRelation (val sqlContext: SQLContext, +val optParams: Map[String, String], +val userSchema: StructType, +val globPaths: Seq[Path], +val metaClient: HoodieTableMetaClient) + extends BaseRelation with TableScan with Logging{ + + private val conf = sqlContext.sparkContext.hadoopConfiguration + + // use schema from latest metadata, if not present, read schema from the data file + private val latestSchema = { +val schemaUtil = new TableSchemaResolver(metaClient) +val tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchemaWithoutMetadataFields) +AvroConversionUtils.convertAvroSchemaToStructType(tableSchema) + } + + private val skipMerge = optParams.getOrElse( +DataSourceReadOptions.REALTIME_SKIP_MERGE_KEY, +DataSourceReadOptions.DEFAULT_REALTIME_SKIP_MERGE_VAL).toBoolean + private val maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(new JobConf(conf)) + private val fileIndex = buildFileIndex() + + override def schema: StructType = latestSchema + + override def needConversion: Boolean = false + + override def buildScan(): RDD[Row] = { +val parquetReaderFunction = new ParquetFileFormat().buildReaderWithPartitionValues( + sparkSession = sqlContext.sparkSession, + dataSchema = latestSchema, + partitionSchema = StructType(Nil), + requiredSchema = latestSchema, Review comment: This is related to `PrunedFilteredScan`. We need to support merging two records with different schemas if we don't read all fields here. This will be targeting for 0.6.0 release This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] garyli1019 commented on a change in pull request #1848: [HUDI-69] Support Spark Datasource for MOR table - RDD approach
garyli1019 commented on a change in pull request #1848: URL: https://github.com/apache/hudi/pull/1848#discussion_r458224048 ## File path: hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala ## @@ -110,6 +112,10 @@ object DataSourceReadOptions { */ val INCR_PATH_GLOB_OPT_KEY = "hoodie.datasource.read.incr.path.glob" val DEFAULT_INCR_PATH_GLOB_OPT_VAL = "" + + + val REALTIME_SKIP_MERGE_KEY = REALTIME_SKIP_MERGE_PROP Review comment: Agree. Maybe something like `SNAPSHOT_READ_STRATEGY`? So we can control the logic for `merge` `unmerge` `mergeWithBootstrap` e.t.c This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] garyli1019 commented on a change in pull request #1848: [HUDI-69] Support Spark Datasource for MOR table - RDD approach
garyli1019 commented on a change in pull request #1848: URL: https://github.com/apache/hudi/pull/1848#discussion_r458215093 ## File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java ## @@ -147,12 +146,4 @@ public Schema getWriterSchema() { public Schema getHiveSchema() { return hiveSchema; } - - public long getMaxCompactionMemoryInBytes() { Review comment: Moved to a utils class. Need to call this method from `HoodieMergeOnReadRDD` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] garyli1019 commented on a change in pull request #1848: [HUDI-69] Support Spark Datasource for MOR table - RDD approach
garyli1019 commented on a change in pull request #1848: URL: https://github.com/apache/hudi/pull/1848#discussion_r458213831 ## File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/config/HadoopSerializableConfiguration.java ## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hadoop.config; + +import org.apache.hadoop.conf.Configuration; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; + +public class HadoopSerializableConfiguration implements Serializable { Review comment: Didn't know this. Will switch over. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] garyli1019 commented on a change in pull request #1848: [HUDI-69] Support Spark Datasource for MOR table - RDD approach
garyli1019 commented on a change in pull request #1848: URL: https://github.com/apache/hudi/pull/1848#discussion_r457844997 ## File path: hudi-spark/src/main/scala/org/apache/hudi/SnapshotRelation.scala ## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.hudi.avro.HoodieAvroUtils +import org.apache.hudi.common.model.HoodieBaseFile +import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.common.table.view.HoodieTableFileSystemView +import org.apache.hudi.exception.HoodieException +import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils +import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapred.JobConf +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.sources.{BaseRelation, TableScan} +import org.apache.spark.sql.types.StructType + +import scala.collection.JavaConverters._ + +case class HudiMergeOnReadFileSplit(dataFile: PartitionedFile, +logPaths: Option[List[String]], +latestCommit: String, +tablePath: String, +maxCompactionMemoryInBytes: Long, +skipMerge: Boolean) + +class SnapshotRelation (val sqlContext: SQLContext, +val optParams: Map[String, String], +val userSchema: StructType, +val globPaths: Seq[Path], +val metaClient: HoodieTableMetaClient) + extends BaseRelation with TableScan with Logging{ Review comment: `PrunedFilteredScan` will change the behavior of `ParquetRecordReader` inside `ParquetFileFormat` even we are not using the vectorized reader. Still trying to figure out why... I will follow up with `PrunedFilteredScan` in a separate PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org