xiarixiaoyao commented on a change in pull request #2722:
URL: https://github.com/apache/hudi/pull/2722#discussion_r615510606



##########
File path: 
hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/functional/TestHoodieCombineHiveInputFormat.java
##########
@@ -84,6 +86,73 @@ public void setUp() throws IOException, InterruptedException 
{
     HoodieTestUtils.init(MiniClusterUtil.configuration, 
tempDir.toAbsolutePath().toString(), HoodieTableType.MERGE_ON_READ);
   }
 
+  @Test
+  public void testMutilReaderRealtimeComineHoodieInputFormat() throws 
Exception {
+    // test for hudi-1722
+    Configuration conf = new Configuration();
+    // initial commit
+    Schema schema = 
HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema());
+    HoodieTestUtils.init(hadoopConf, tempDir.toAbsolutePath().toString(), 
HoodieTableType.MERGE_ON_READ);
+    String commitTime = "100";
+    final int numRecords = 1000;
+    // Create 3 parquet files with 1000 records each
+    File partitionDir = InputFormatTestUtil.prepareParquetTable(tempDir, 
schema, 3, numRecords, commitTime);
+    InputFormatTestUtil.commit(tempDir, commitTime);
+
+    String newCommitTime = "101";
+    // to trigger the bug of HUDI-1772, only update fileid2
+    // insert 1000 update records to log file 2
+    // now fileid0, fileid1 has no log files, fileid2 has log file
+    HoodieLogFormat.Writer writer =
+            InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, 
schema, "fileid2", commitTime, newCommitTime,
+                    numRecords, numRecords, 0);
+    writer.close();
+
+    TableDesc tblDesc = Utilities.defaultTd;
+    // Set the input format
+    tblDesc.setInputFileFormatClass(HoodieParquetRealtimeInputFormat.class);
+    PartitionDesc partDesc = new PartitionDesc(tblDesc, null);
+    LinkedHashMap<Path, PartitionDesc> pt = new LinkedHashMap<>();
+    LinkedHashMap<Path, ArrayList<String>> tableAlias = new LinkedHashMap<>();
+    ArrayList<String> alias = new ArrayList<>();
+    alias.add(tempDir.toAbsolutePath().toString());
+    tableAlias.put(new Path(tempDir.toAbsolutePath().toString()), alias);
+    pt.put(new Path(tempDir.toAbsolutePath().toString()), partDesc);
+
+    MapredWork mrwork = new MapredWork();
+    mrwork.getMapWork().setPathToPartitionInfo(pt);
+    mrwork.getMapWork().setPathToAliases(tableAlias);
+    Path mapWorkPath = new Path(tempDir.toAbsolutePath().toString());
+    Utilities.setMapRedWork(conf, mrwork, mapWorkPath);
+    jobConf = new JobConf(conf);
+    // Add the paths
+    FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
+    jobConf.set(HAS_MAP_WORK, "true");
+    // The following config tells Hive to choose ExecMapper to read the 
MAP_WORK
+    jobConf.set(MAPRED_MAPPER_CLASS, ExecMapper.class.getName());
+    // set SPLIT_MAXSIZE larger  to create one split for 3 files groups
+    
jobConf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.SPLIT_MAXSIZE,
 "128000000");
+
+    HoodieCombineHiveInputFormat combineHiveInputFormat = new 
HoodieCombineHiveInputFormat();
+    String tripsHiveColumnTypes = 
"double,string,string,string,double,double,double,double,double";
+    InputFormatTestUtil.setProjectFieldsForInputFormat(jobConf, schema, 
tripsHiveColumnTypes);
+    InputSplit[] splits = combineHiveInputFormat.getSplits(jobConf, 1);
+    // Since the SPLIT_SIZE is 3, we should create only 1 split with all 3 
file groups
+    assertEquals(1, splits.length);
+    RecordReader<NullWritable, ArrayWritable> recordReader =

Review comment:
       yes, we only create one combine recorder, but this recorder hold three 
RealtimeCompactedRecordReaders。
   the executing order of the RealtimeCompactedRecordReaders lead this npe 
problem.   
   for test example:
   combine recorder holds three RealtimeCompactedRecordReaders, we call them 
creader1, creader2, creader3
   creader1:  only has base file
   creader2: only has base file
   creader3: has base file and log file.
   
   if creader3 is create firstly, hoodie additional projection columns will be 
added to jobConf and in this case the query will be ok
   however if creader1 or creader2 is create firstly,  no hoodie additional 
projection columns will be added to jobConf, the query will failed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to