abstractdog commented on code in PR #3836:
URL: https://github.com/apache/hive/pull/3836#discussion_r1067068061
##########
ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java:
##########
@@ -162,6 +166,59 @@ public void testNonAcidDynamicPartitioning() throws
Exception {
confirmOutput(DataFormat.WITH_PARTITION_VALUE);
}
+ @Test
+ public void testNonAcidRemoveDuplicate() throws Exception {
+ setBasePath("writeDuplicate");
+ setupData(DataFormat.WITH_PARTITION_VALUE);
+
+ FileSinkDesc desc = (FileSinkDesc)
getFileSink(AcidUtils.Operation.NOT_ACID, true, 0).getConf().clone();
+ desc.setLinkedFileSink(true);
+ desc.setDirName(new Path(desc.getDirName(),
AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "0"));
+ JobConf jobConf = new JobConf(jc);
+ jobConf.set("hive.execution.engine", "tez");
+ jobConf.set("mapred.task.id", "000000_0");
+ FileSinkOperator op1 = (FileSinkOperator)OperatorFactory.get(new
CompilationOpContext(), FileSinkDesc.class);
+ op1.setConf(desc);
+ op1.initialize(jobConf, new ObjectInspector[]{inspector});
+
+ JobConf jobConf2 = new JobConf(jobConf);
+ jobConf2.set("mapred.task.id", "000000_1");
+ FileSinkOperator speculative = (FileSinkOperator)OperatorFactory.get(
+ new CompilationOpContext(), FileSinkDesc.class);
+ speculative.setConf(desc);
+ speculative.initialize(jobConf2, new ObjectInspector[]{inspector});
+
+ for (Object r : rows) {
+ op1.process(r, 0);
+ speculative.process(r, 0);
+ }
+
+ op1.close(false);
+ // speculative task also ends successfully
+ speculative.close(false);
+ Path[] paths = findFilesInBasePath();
+ List<Path> mondays = Arrays.stream(paths)
+ .filter(path ->
path.getParent().toString().endsWith("partval=Monday/HIVE_UNION_SUBDIR_0"))
+ .collect(Collectors.toList());
+ Assert.assertTrue(mondays.size() == 2);
+ Set<String> fileNames = new HashSet<>();
+ fileNames.add(mondays.get(0).getName());
+ fileNames.add(mondays.get(1).getName());
+ Assert.assertTrue(fileNames.contains("000000_1") &&
fileNames.contains("000000_0"));
+
+ op1.jobCloseOp(jobConf, true);
+ List<Path> resultFiles = new ArrayList<Path>();
+ recurseOnPath(basePath, basePath.getFileSystem(jc), resultFiles);
+ mondays = resultFiles.stream()
+ .filter(path ->
path.getParent().toString().endsWith("partval=Monday/HIVE_UNION_SUBDIR_0"))
+ .collect(Collectors.toList());
+ Assert.assertTrue(mondays.size() == 1);
Review Comment:
please use Assert.assertEquals which will show the expected vs. actual values
##########
ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java:
##########
@@ -162,6 +166,59 @@ public void testNonAcidDynamicPartitioning() throws
Exception {
confirmOutput(DataFormat.WITH_PARTITION_VALUE);
}
+ @Test
+ public void testNonAcidRemoveDuplicate() throws Exception {
+ setBasePath("writeDuplicate");
+ setupData(DataFormat.WITH_PARTITION_VALUE);
+
+ FileSinkDesc desc = (FileSinkDesc)
getFileSink(AcidUtils.Operation.NOT_ACID, true, 0).getConf().clone();
+ desc.setLinkedFileSink(true);
+ desc.setDirName(new Path(desc.getDirName(),
AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "0"));
+ JobConf jobConf = new JobConf(jc);
+ jobConf.set("hive.execution.engine", "tez");
+ jobConf.set("mapred.task.id", "000000_0");
+ FileSinkOperator op1 = (FileSinkOperator)OperatorFactory.get(new
CompilationOpContext(), FileSinkDesc.class);
+ op1.setConf(desc);
+ op1.initialize(jobConf, new ObjectInspector[]{inspector});
+
+ JobConf jobConf2 = new JobConf(jobConf);
+ jobConf2.set("mapred.task.id", "000000_1");
+ FileSinkOperator speculative = (FileSinkOperator)OperatorFactory.get(
+ new CompilationOpContext(), FileSinkDesc.class);
+ speculative.setConf(desc);
+ speculative.initialize(jobConf2, new ObjectInspector[]{inspector});
+
+ for (Object r : rows) {
+ op1.process(r, 0);
+ speculative.process(r, 0);
+ }
+
+ op1.close(false);
+ // speculative task also ends successfully
+ speculative.close(false);
+ Path[] paths = findFilesInBasePath();
+ List<Path> mondays = Arrays.stream(paths)
+ .filter(path ->
path.getParent().toString().endsWith("partval=Monday/HIVE_UNION_SUBDIR_0"))
+ .collect(Collectors.toList());
+ Assert.assertTrue(mondays.size() == 2);
+ Set<String> fileNames = new HashSet<>();
+ fileNames.add(mondays.get(0).getName());
+ fileNames.add(mondays.get(1).getName());
+ Assert.assertTrue(fileNames.contains("000000_1") &&
fileNames.contains("000000_0"));
+
+ op1.jobCloseOp(jobConf, true);
+ List<Path> resultFiles = new ArrayList<Path>();
+ recurseOnPath(basePath, basePath.getFileSystem(jc), resultFiles);
+ mondays = resultFiles.stream()
+ .filter(path ->
path.getParent().toString().endsWith("partval=Monday/HIVE_UNION_SUBDIR_0"))
+ .collect(Collectors.toList());
+ Assert.assertTrue(mondays.size() == 1);
+ Assert.assertTrue(mondays.get(0).getName().equals("000000_1"));
Review Comment:
similarly, Assert.assertEquals
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]