[ https://issues.apache.org/jira/browse/HUDI-5253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hui An reassigned HUDI-5253: ---------------------------- Assignee: Hui An > HoodieMergeOnReadTableInputFormat could have duplicate records issue if it > contains delta files while still splittable > ---------------------------------------------------------------------------------------------------------------------- > > Key: HUDI-5253 > URL: https://issues.apache.org/jira/browse/HUDI-5253 > Project: Apache Hudi > Issue Type: Bug > Components: reader-core > Reporter: Hui An > Assignee: Hui An > Priority: Major > Labels: pull-request-available > Fix For: 0.12.2 > > > We can also find that sometimes could throw {{IllegalStateException}} > duplicates key error when we run the CI. > {code:java} > java.lang.IllegalStateException: Duplicate key {"_hoodie_commit_time": > 20221122170106908, "_hoodie_commit_seqno": 20221122170106908_0_48, > "_hoodie_record_key": 53c028d3-e4b0-4d6f-a041-f09e418c36b3, > "_hoodie_partition_path": 2016/03/15, "_hoodie_file_name": > 6a9fff5b-3b75-48ad-85fe-c621c9a2c25d-0, "timestamp": 0, "_row_key": > 53c028d3-e4b0-4d6f-a041-f09e418c36b3, "partition_path": 2016/03/15, "rider": > rider-20221122170106908, "driver": driver-20221122170106908, "begin_lat": > 0.5407076277518825, "begin_lon": 0.39726822192851885, "end_lat": > 0.49363027135660975, "end_lon": 0.6482366665027408, "distance_in_meters": > -1534272590, "seconds_since_epoch": 6103867871123100710, "weight": > 0.38126373, "nation": 7b 62 79 74 65 73 3d 43 61 6e 61 64 61 7d, > "current_date": 1970-01-17, "current_ts": 1460315658, "height": 0.093258, > "city_to_state": org.apache.hadoop.io.ArrayWritable@1b28684, "fare": > org.apache.hadoop.io.ArrayWritable@694b818f, "tip_history": > org.apache.hadoop.io.ArrayWritable@63cb1a9a, "_hoodie_is_deleted": false} > at > java.util.stream.Collectors.lambda$throwingMerger$0(Collectors.java:133) > at java.util.HashMap.merge(HashMap.java:1254) > at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320) > at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) > at > java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) > at > java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) > at > org.apache.hudi.testutils.GenericRecordValidationTestUtils.assertDataInMORTable(GenericRecordValidationTestUtils.java:95) > at > org.apache.hudi.testutils.GenericRecordValidationTestUtils.assertDataInMORTable(GenericRecordValidationTestUtils.java:80) > at > org.apache.hudi.client.functional.TestHoodieClientOnMergeOnReadStorage.testLogCompactionOnMORTable(TestHoodieClientOnMergeOnReadStorage.java:187) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688) > at > org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) > at > org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149) > at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140) > at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84) > at > org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115) > at > org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) > at > org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104) > at > org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:210) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129) > at > org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84) > at java.util.ArrayList.forEach(ArrayList.java:1259) > at > org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129) > at > org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84) > at java.util.ArrayList.forEach(ArrayList.java:1259) > at > org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129) > at > org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84) > at > org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32) > at > org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57) > at > org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51) > at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:108) > at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88) > at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54) > at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67) > at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52) > at > org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:96) > at > org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:75) > at > com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:57) > at > com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38) > at > com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11) > at > com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35) > at > com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235) > at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54) > {code} > We can easily to reproduce this in > {{org.apache.hudi.testutils.HoodieMergeOnReadTestUtils#getRecordsUsingInputFormat}} > to allow it create more splits > {code:java} > FileInputFormat.setInputPaths(jobConf, String.join(",", inputPaths)); > // Add 3 to the inputPaths to create more splits > InputSplit[] splits = inputFormat.getSplits(jobConf, inputPaths.size() + 3); > for (InputSplit split : splits) { > // ... > } > {code} > We actually cannot allow a path to be splitable if it contains delta files. -- This message was sent by Atlassian Jira (v8.20.10#820010)