This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e90f14  Fix bug when importing files with the same name in different 
directories (#8337)
1e90f14 is described below

commit 1e90f141282e40f819de806920cc2a836e0e35ba
Author: Mark Needham <[email protected]>
AuthorDate: Tue Mar 22 18:35:23 2022 +0000

    Fix bug when importing files with the same name in different directories 
(#8337)
---
 .../standalone/SegmentGenerationJobRunner.java     |  8 ++-
 .../standalone/SegmentGenerationJobRunnerTest.java | 84 ++++++++++++++++++++++
 2 files changed, 91 insertions(+), 1 deletion(-)

diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
index 86480ec..7c84945 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
@@ -238,7 +238,7 @@ public class SegmentGenerationJobRunner implements 
IngestionJobRunner {
     FileUtils.forceMkdir(localOutputTempDir);
 
     //copy input path to local
-    File localInputDataFile = new File(localInputTempDir, new 
File(inputFileURI.getPath()).getName());
+    File localInputDataFile = createLocalInputDateFile(inputFileURI, 
localInputTempDir);
     _inputDirFS.copyToLocalFile(inputFileURI, localInputDataFile);
 
     //create task spec
@@ -290,4 +290,10 @@ public class SegmentGenerationJobRunner implements 
IngestionJobRunner {
       }
     });
   }
+
+  private File createLocalInputDateFile(URI inputFileURI, File 
localInputTempDir) {
+    String inputFileURIPath = inputFileURI.getPath();
+    File localInputFileDir = new File(localInputTempDir, 
UUID.randomUUID().toString());
+    return new File(localInputFileDir, new File(inputFileURIPath).getName());
+  }
 }
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/test/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunnerTest.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/test/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunnerTest.java
index 1f920c4..2e39c43 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/test/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunnerTest.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/test/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunnerTest.java
@@ -139,4 +139,88 @@ public class SegmentGenerationJobRunnerTest {
 
     // FUTURE - validate contents of file?
   }
+
+  @Test
+  public void testInputFilesWithSameNameInDifferentDirectories()
+      throws Exception {
+    File testDir = 
Files.createTempDirectory("testSegmentGeneration-").toFile();
+    testDir.delete();
+    testDir.mkdirs();
+
+    File inputDir = new File(testDir, "input");
+    File inputSubDir1 = new File(inputDir, "2009");
+    File inputSubDir2 = new File(inputDir, "2010");
+    inputSubDir1.mkdirs();
+    inputSubDir2.mkdirs();
+
+    File inputFile1 = new File(inputSubDir1, "input.csv");
+    FileUtils.writeLines(inputFile1, Lists.newArrayList("col1,col2", 
"value1,1", "value2,2"));
+
+    File inputFile2 = new File(inputSubDir2, "input.csv");
+    FileUtils.writeLines(inputFile2, Lists.newArrayList("col1,col2", 
"value3,3", "value4,4"));
+
+    File outputDir = new File(testDir, "output");
+
+    // Set up schema file.
+    final String schemaName = "mySchema";
+    File schemaFile = new File(testDir, "schema");
+    Schema schema = new SchemaBuilder()
+        .setSchemaName(schemaName)
+        .addSingleValueDimension("col1", DataType.STRING)
+        .addMetric("col2", DataType.INT)
+        .build();
+    FileUtils.write(schemaFile, schema.toPrettyJsonString(), 
StandardCharsets.UTF_8);
+
+    // Set up table config file.
+    File tableConfigFile = new File(testDir, "tableConfig");
+    TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE)
+        .setTableName("myTable")
+        .setSchemaName(schemaName)
+        .setNumReplicas(1)
+        .build();
+    FileUtils.write(tableConfigFile, tableConfig.toJsonString(), 
StandardCharsets.UTF_8);
+
+    SegmentGenerationJobSpec jobSpec = new SegmentGenerationJobSpec();
+    jobSpec.setJobType("SegmentCreation");
+    jobSpec.setInputDirURI(inputDir.toURI().toString());
+    jobSpec.setOutputDirURI(outputDir.toURI().toString());
+    jobSpec.setOverwriteOutput(true);
+
+    RecordReaderSpec recordReaderSpec = new RecordReaderSpec();
+    recordReaderSpec.setDataFormat("csv");
+    recordReaderSpec.setClassName(CSVRecordReader.class.getName());
+    recordReaderSpec.setConfigClassName(CSVRecordReaderConfig.class.getName());
+    jobSpec.setRecordReaderSpec(recordReaderSpec);
+
+    TableSpec tableSpec = new TableSpec();
+    tableSpec.setTableName("myTable");
+    tableSpec.setSchemaURI(schemaFile.toURI().toString());
+    tableSpec.setTableConfigURI(tableConfigFile.toURI().toString());
+    jobSpec.setTableSpec(tableSpec);
+
+    ExecutionFrameworkSpec efSpec = new ExecutionFrameworkSpec();
+    efSpec.setName("standalone");
+    
efSpec.setSegmentGenerationJobRunnerClassName(SegmentGenerationJobRunner.class.getName());
+    jobSpec.setExecutionFrameworkSpec(efSpec);
+
+    PinotFSSpec pfsSpec = new PinotFSSpec();
+    pfsSpec.setScheme("file");
+    pfsSpec.setClassName(LocalPinotFS.class.getName());
+    jobSpec.setPinotFSSpecs(Collections.singletonList(pfsSpec));
+
+    SegmentGenerationJobRunner jobRunner = new 
SegmentGenerationJobRunner(jobSpec);
+    jobRunner.run();
+
+    // Check that both segment files are created
+
+    File newSegmentFile2009 = new File(outputDir, 
"2009/myTable_OFFLINE_0.tar.gz");
+    Assert.assertTrue(newSegmentFile2009.exists());
+    Assert.assertTrue(newSegmentFile2009.isFile());
+    Assert.assertTrue(newSegmentFile2009.length() > 0);
+
+    File newSegmentFile2010 = new File(outputDir, 
"2010/myTable_OFFLINE_0.tar.gz");
+    Assert.assertTrue(newSegmentFile2010.exists());
+    Assert.assertTrue(newSegmentFile2010.isFile());
+    Assert.assertTrue(newSegmentFile2010.length() > 0);
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to