[ 
https://issues.apache.org/jira/browse/HUDI-5253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hui An reassigned HUDI-5253:
----------------------------

    Assignee: Hui An

> HoodieMergeOnReadTableInputFormat could have duplicate records issue if it 
> contains delta files while still splittable
> ----------------------------------------------------------------------------------------------------------------------
>
>                 Key: HUDI-5253
>                 URL: https://issues.apache.org/jira/browse/HUDI-5253
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: reader-core
>            Reporter: Hui An
>            Assignee: Hui An
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 0.12.2
>
>
> We can also find that sometimes could throw {{IllegalStateException}} 
> duplicates key error when we run the CI.
> {code:java}
>  java.lang.IllegalStateException: Duplicate key {"_hoodie_commit_time": 
> 20221122170106908, "_hoodie_commit_seqno": 20221122170106908_0_48, 
> "_hoodie_record_key": 53c028d3-e4b0-4d6f-a041-f09e418c36b3, 
> "_hoodie_partition_path": 2016/03/15, "_hoodie_file_name": 
> 6a9fff5b-3b75-48ad-85fe-c621c9a2c25d-0, "timestamp": 0, "_row_key": 
> 53c028d3-e4b0-4d6f-a041-f09e418c36b3, "partition_path": 2016/03/15, "rider": 
> rider-20221122170106908, "driver": driver-20221122170106908, "begin_lat": 
> 0.5407076277518825, "begin_lon": 0.39726822192851885, "end_lat": 
> 0.49363027135660975, "end_lon": 0.6482366665027408, "distance_in_meters": 
> -1534272590, "seconds_since_epoch": 6103867871123100710, "weight": 
> 0.38126373, "nation": 7b 62 79 74 65 73 3d 43 61 6e 61 64 61 7d, 
> "current_date": 1970-01-17, "current_ts": 1460315658, "height": 0.093258, 
> "city_to_state": org.apache.hadoop.io.ArrayWritable@1b28684, "fare": 
> org.apache.hadoop.io.ArrayWritable@694b818f, "tip_history": 
> org.apache.hadoop.io.ArrayWritable@63cb1a9a, "_hoodie_is_deleted": false}
>       at 
> java.util.stream.Collectors.lambda$throwingMerger$0(Collectors.java:133)
>       at java.util.HashMap.merge(HashMap.java:1254)
>       at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320)
>       at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
>       at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
>       at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
>       at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>       at 
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>       at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>       at 
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
>       at 
> org.apache.hudi.testutils.GenericRecordValidationTestUtils.assertDataInMORTable(GenericRecordValidationTestUtils.java:95)
>       at 
> org.apache.hudi.testutils.GenericRecordValidationTestUtils.assertDataInMORTable(GenericRecordValidationTestUtils.java:80)
>       at 
> org.apache.hudi.client.functional.TestHoodieClientOnMergeOnReadStorage.testLogCompactionOnMORTable(TestHoodieClientOnMergeOnReadStorage.java:187)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
>       at 
> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
>       at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
>       at 
> org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
>       at 
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
>       at 
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
>       at 
> org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
>       at 
> org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
>       at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
>       at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
>       at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
>       at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
>       at 
> org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
>       at 
> org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
>       at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:210)
>       at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>       at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206)
>       at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131)
>       at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65)
>       at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
>       at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>       at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
>       at 
> org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
>       at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
>       at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>       at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
>       at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
>       at java.util.ArrayList.forEach(ArrayList.java:1259)
>       at 
> org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
>       at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
>       at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>       at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
>       at 
> org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
>       at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
>       at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>       at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
>       at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
>       at java.util.ArrayList.forEach(ArrayList.java:1259)
>       at 
> org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
>       at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
>       at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>       at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
>       at 
> org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
>       at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
>       at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>       at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
>       at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
>       at 
> org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
>       at 
> org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
>       at 
> org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
>       at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:108)
>       at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
>       at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
>       at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
>       at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
>       at 
> org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:96)
>       at 
> org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:75)
>       at 
> com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:57)
>       at 
> com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
>       at 
> com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
>       at 
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
>       at 
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
>       at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
> {code}
> We can easily to reproduce this in 
> {{org.apache.hudi.testutils.HoodieMergeOnReadTestUtils#getRecordsUsingInputFormat}}
>  to allow it create more splits
> {code:java}
> FileInputFormat.setInputPaths(jobConf, String.join(",", inputPaths));
> // Add 3 to the inputPaths to create more splits
> InputSplit[] splits = inputFormat.getSplits(jobConf, inputPaths.size() + 3);
> for (InputSplit split : splits) {
> // ...
> }
> {code}
> We actually cannot allow a path to be splitable if it contains delta files.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to