xiarixiaoyao commented on a change in pull request #2722: URL: https://github.com/apache/hudi/pull/2722#discussion_r615510606
########## File path: hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/functional/TestHoodieCombineHiveInputFormat.java ########## @@ -84,6 +86,73 @@ public void setUp() throws IOException, InterruptedException { HoodieTestUtils.init(MiniClusterUtil.configuration, tempDir.toAbsolutePath().toString(), HoodieTableType.MERGE_ON_READ); } + @Test + public void testMutilReaderRealtimeComineHoodieInputFormat() throws Exception { + // test for hudi-1722 + Configuration conf = new Configuration(); + // initial commit + Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema()); + HoodieTestUtils.init(hadoopConf, tempDir.toAbsolutePath().toString(), HoodieTableType.MERGE_ON_READ); + String commitTime = "100"; + final int numRecords = 1000; + // Create 3 parquet files with 1000 records each + File partitionDir = InputFormatTestUtil.prepareParquetTable(tempDir, schema, 3, numRecords, commitTime); + InputFormatTestUtil.commit(tempDir, commitTime); + + String newCommitTime = "101"; + // to trigger the bug of HUDI-1772, only update fileid2 + // insert 1000 update records to log file 2 + // now fileid0, fileid1 has no log files, fileid2 has log file + HoodieLogFormat.Writer writer = + InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema, "fileid2", commitTime, newCommitTime, + numRecords, numRecords, 0); + writer.close(); + + TableDesc tblDesc = Utilities.defaultTd; + // Set the input format + tblDesc.setInputFileFormatClass(HoodieParquetRealtimeInputFormat.class); + PartitionDesc partDesc = new PartitionDesc(tblDesc, null); + LinkedHashMap<Path, PartitionDesc> pt = new LinkedHashMap<>(); + LinkedHashMap<Path, ArrayList<String>> tableAlias = new LinkedHashMap<>(); + ArrayList<String> alias = new ArrayList<>(); + alias.add(tempDir.toAbsolutePath().toString()); + tableAlias.put(new Path(tempDir.toAbsolutePath().toString()), alias); + pt.put(new Path(tempDir.toAbsolutePath().toString()), partDesc); + + MapredWork mrwork = new MapredWork(); + mrwork.getMapWork().setPathToPartitionInfo(pt); + mrwork.getMapWork().setPathToAliases(tableAlias); + Path mapWorkPath = new Path(tempDir.toAbsolutePath().toString()); + Utilities.setMapRedWork(conf, mrwork, mapWorkPath); + jobConf = new JobConf(conf); + // Add the paths + FileInputFormat.setInputPaths(jobConf, partitionDir.getPath()); + jobConf.set(HAS_MAP_WORK, "true"); + // The following config tells Hive to choose ExecMapper to read the MAP_WORK + jobConf.set(MAPRED_MAPPER_CLASS, ExecMapper.class.getName()); + // set SPLIT_MAXSIZE larger to create one split for 3 files groups + jobConf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.SPLIT_MAXSIZE, "128000000"); + + HoodieCombineHiveInputFormat combineHiveInputFormat = new HoodieCombineHiveInputFormat(); + String tripsHiveColumnTypes = "double,string,string,string,double,double,double,double,double"; + InputFormatTestUtil.setProjectFieldsForInputFormat(jobConf, schema, tripsHiveColumnTypes); + InputSplit[] splits = combineHiveInputFormat.getSplits(jobConf, 1); + // Since the SPLIT_SIZE is 3, we should create only 1 split with all 3 file groups + assertEquals(1, splits.length); + RecordReader<NullWritable, ArrayWritable> recordReader = Review comment: yes, we only create one combine recorder, but this recorder hold three RealtimeCompactedRecordReaders。 the executing order of the RealtimeCompactedRecordReaders lead this npe problem. for test example: combine recorder holds three RealtimeCompactedRecordReaders, we call them creader1, creader2, creader3 creader1: only has base file creader2: only has base file creader3: has base file and log file. if creader3 is create firstly, hoodie additional projection columns will be added to jobConf and in this case the query will be ok however if creader1 or creader2 is create firstly, no hoodie additional projection columns will be added to jobConf, the query will failed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org