lw309637554 commented on a change in pull request #2722:
URL: https://github.com/apache/hudi/pull/2722#discussion_r615365836



##########
File path: 
hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/functional/TestHoodieCombineHiveInputFormat.java
##########
@@ -84,6 +86,73 @@ public void setUp() throws IOException, InterruptedException 
{
     HoodieTestUtils.init(MiniClusterUtil.configuration, 
tempDir.toAbsolutePath().toString(), HoodieTableType.MERGE_ON_READ);
   }
 
+  @Test
+  public void testMutilReaderRealtimeComineHoodieInputFormat() throws 
Exception {
+    // test for hudi-1722
+    Configuration conf = new Configuration();
+    // initial commit
+    Schema schema = 
HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema());
+    HoodieTestUtils.init(hadoopConf, tempDir.toAbsolutePath().toString(), 
HoodieTableType.MERGE_ON_READ);
+    String commitTime = "100";
+    final int numRecords = 1000;
+    // Create 3 parquet files with 1000 records each
+    File partitionDir = InputFormatTestUtil.prepareParquetTable(tempDir, 
schema, 3, numRecords, commitTime);
+    InputFormatTestUtil.commit(tempDir, commitTime);
+
+    String newCommitTime = "101";
+    // to trigger the bug of HUDI-1772, only update fileid2
+    // insert 1000 update records to log file 2
+    // now fileid0, fileid1 has no log files, fileid2 has log file
+    HoodieLogFormat.Writer writer =
+            InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, 
schema, "fileid2", commitTime, newCommitTime,
+                    numRecords, numRecords, 0);
+    writer.close();
+
+    TableDesc tblDesc = Utilities.defaultTd;
+    // Set the input format
+    tblDesc.setInputFileFormatClass(HoodieParquetRealtimeInputFormat.class);
+    PartitionDesc partDesc = new PartitionDesc(tblDesc, null);
+    LinkedHashMap<Path, PartitionDesc> pt = new LinkedHashMap<>();
+    LinkedHashMap<Path, ArrayList<String>> tableAlias = new LinkedHashMap<>();
+    ArrayList<String> alias = new ArrayList<>();
+    alias.add(tempDir.toAbsolutePath().toString());
+    tableAlias.put(new Path(tempDir.toAbsolutePath().toString()), alias);
+    pt.put(new Path(tempDir.toAbsolutePath().toString()), partDesc);
+
+    MapredWork mrwork = new MapredWork();
+    mrwork.getMapWork().setPathToPartitionInfo(pt);
+    mrwork.getMapWork().setPathToAliases(tableAlias);
+    Path mapWorkPath = new Path(tempDir.toAbsolutePath().toString());
+    Utilities.setMapRedWork(conf, mrwork, mapWorkPath);
+    jobConf = new JobConf(conf);
+    // Add the paths
+    FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
+    jobConf.set(HAS_MAP_WORK, "true");
+    // The following config tells Hive to choose ExecMapper to read the 
MAP_WORK
+    jobConf.set(MAPRED_MAPPER_CLASS, ExecMapper.class.getName());
+    // set SPLIT_MAXSIZE larger  to create one split for 3 files groups
+    
jobConf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.SPLIT_MAXSIZE,
 "128000000");
+
+    HoodieCombineHiveInputFormat combineHiveInputFormat = new 
HoodieCombineHiveInputFormat();
+    String tripsHiveColumnTypes = 
"double,string,string,string,double,double,double,double,double";
+    InputFormatTestUtil.setProjectFieldsForInputFormat(jobConf, schema, 
tripsHiveColumnTypes);
+    InputSplit[] splits = combineHiveInputFormat.getSplits(jobConf, 1);
+    // Since the SPLIT_SIZE is 3, we should create only 1 split with all 3 
file groups
+    assertEquals(1, splits.length);
+    RecordReader<NullWritable, ArrayWritable> recordReader =

Review comment:
       hello , just see one recordreader?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to