[ https://issues.apache.org/jira/browse/HUDI-3057?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Alexey Kudinkin reassigned HUDI-3057: ------------------------------------- Assignee: sivabalan narayanan > Instants should be generated strictly under locks > ------------------------------------------------- > > Key: HUDI-3057 > URL: https://issues.apache.org/jira/browse/HUDI-3057 > Project: Apache Hudi > Issue Type: Bug > Reporter: Alexey Kudinkin > Assignee: sivabalan narayanan > Priority: Major > > While looking into the flakiness of the tests outlined here: > https://issues.apache.org/jira/browse/HUDI-3043 > > I've stumbled upon following failure where one of the writers tries to > complete the Commit but it couldn't b/c such file does already exist: > {code:java} > java.util.concurrent.ExecutionException: java.lang.RuntimeException: > org.apache.hudi.exception.HoodieIOException: Failed to create file > /var/folders/kb/cnff55vj041g2nnlzs5ylqk00000gn/T/junit5142536255031969586/testtable_MERGE_ON_READ/.hoodie/20211217150157632.commit > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.hudi.utilities.functional.TestHoodieDeltaStreamerWithMultiWriter.runJobsInParallel(TestHoodieDeltaStreamerWithMultiWriter.java:336) > at > org.apache.hudi.utilities.functional.TestHoodieDeltaStreamerWithMultiWriter.testUpsertsContinuousModeWithMultipleWriters(TestHoodieDeltaStreamerWithMultiWriter.java:150) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688) > at > org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) > at > org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149) > at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140) > at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestTemplateMethod(TimeoutExtension.java:92) > at > org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115) > at > org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) > at > org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104) > at > org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:212) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:208) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:137) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:71) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129) > at > org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84) > at > org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask$DefaultDynamicTestExecutor.execute(NodeTestTask.java:212) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask$DefaultDynamicTestExecutor.execute(NodeTestTask.java:192) > at > org.junit.jupiter.engine.descriptor.TestTemplateTestDescriptor.execute(TestTemplateTestDescriptor.java:139) > at > org.junit.jupiter.engine.descriptor.TestTemplateTestDescriptor.lambda$execute$2(TestTemplateTestDescriptor.java:107) > at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) > at > java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > at > java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) > at > java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) > at > java.util.stream.ReferencePipeline$11$1.accept(ReferencePipeline.java:440) > at > java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > at > java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > at > java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) > at > java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > at java.util.Iterator.forEachRemaining(Iterator.java:116) > at > java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801) > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) > at > java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) > at > java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) > at > java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272) > at > java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > at > java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > at > java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > at > java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) > at > java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) > at > java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) > at > java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272) > at > java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) > at > java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) > at > java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) > at > org.junit.jupiter.engine.descriptor.TestTemplateTestDescriptor.execute(TestTemplateTestDescriptor.java:107) > at > org.junit.jupiter.engine.descriptor.TestTemplateTestDescriptor.execute(TestTemplateTestDescriptor.java:42) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129) > at > org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84) > at java.util.ArrayList.forEach(ArrayList.java:1259) > at > org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129) > at > org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84) > at java.util.ArrayList.forEach(ArrayList.java:1259) > at > org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129) > at > org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84) > at > org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32) > at > org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57) > at > org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51) > at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107) > at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:87) > at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:53) > at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:66) > at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:51) > at > org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:87) > at > org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:66) > at > com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:71) > at > com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) > at > com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235) > at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54) > Caused by: java.lang.RuntimeException: > org.apache.hudi.exception.HoodieIOException: Failed to create file > /var/folders/kb/cnff55vj041g2nnlzs5ylqk00000gn/T/junit5142536255031969586/testtable_MERGE_ON_READ/.hoodie/20211217150157632.commit > at > org.apache.hudi.utilities.functional.TestHoodieDeltaStreamerWithMultiWriter.lambda$runJobsInParallel$2(TestHoodieDeltaStreamerWithMultiWriter.java:333) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.hudi.exception.HoodieIOException: Failed to create file > /var/folders/kb/cnff55vj041g2nnlzs5ylqk00000gn/T/junit5142536255031969586/testtable_MERGE_ON_READ/.hoodie/20211217150157632.commit > at > org.apache.hudi.common.table.timeline.HoodieActiveTimeline.createImmutableFileInPath(HoodieActiveTimeline.java:618) > at > org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:482) > at > org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:457) > at > org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionCompactionInflightToComplete(HoodieActiveTimeline.java:328) > at > org.apache.hudi.table.action.compact.CompactHelpers.completeInflightCompaction(CompactHelpers.java:79) > at > org.apache.hudi.client.SparkRDDWriteClient.completeCompaction(SparkRDDWriteClient.java:310) > at > org.apache.hudi.client.SparkRDDWriteClient.completeTableService(SparkRDDWriteClient.java:474) > at > org.apache.hudi.client.SparkRDDWriteClient.compact(SparkRDDWriteClient.java:344) > at > org.apache.hudi.client.SparkRDDWriteClient.compact(SparkRDDWriteClient.java:75) > at > org.apache.hudi.client.AbstractHoodieWriteClient.lambda$runAnyPendingCompactions$1(AbstractHoodieWriteClient.java:494) > at > java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) > at > java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:647) > at > org.apache.hudi.client.AbstractHoodieWriteClient.runAnyPendingCompactions(AbstractHoodieWriteClient.java:492) > at > org.apache.hudi.client.AbstractHoodieWriteClient.runTableServicesInline(AbstractHoodieWriteClient.java:472) > at > org.apache.hudi.client.AbstractHoodieWriteClient.commitStats(AbstractHoodieWriteClient.java:206) > at > org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:125) > at > org.apache.hudi.utilities.deltastreamer.DeltaSync.writeToSink(DeltaSync.java:536) > at > org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:308) > at > org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$2(HoodieDeltaStreamer.java:193) > at org.apache.hudi.common.util.Option.ifPresent(Option.java:96) > at > org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:191) > at > org.apache.hudi.utilities.functional.TestHoodieDeltaStreamerWithMultiWriter.lambda$runJobsInParallel$2(TestHoodieDeltaStreamerWithMultiWriter.java:331) > ... 5 more > Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: File already > exists: > file:/var/folders/kb/cnff55vj041g2nnlzs5ylqk00000gn/T/junit5142536255031969586/testtable_MERGE_ON_READ/.hoodie/20211217150157632.commit > at > org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:289) > at > org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:328) > at > org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:398) > at > org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:461) > at > org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:440) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:892) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:789) > at > org.apache.hudi.common.fs.HoodieWrapperFileSystem.lambda$create$2(HoodieWrapperFileSystem.java:221) > at > org.apache.hudi.common.fs.HoodieWrapperFileSystem.executeFuncWithTimeMetrics(HoodieWrapperFileSystem.java:100) > at > org.apache.hudi.common.fs.HoodieWrapperFileSystem.create(HoodieWrapperFileSystem.java:220) > at > org.apache.hudi.common.table.timeline.HoodieActiveTimeline.createImmutableFileInPath(HoodieActiveTimeline.java:613) > ... 26 more > {code} > I've skimmed through the code and it seems that we're actually grabbing an > Instant for the operation *before* __ we start the txn (and acquire locks), > while we actually should do it strictly *under* locks (ie w/in the txn) to > guarantee that all operations on the timeline are ordered appropriately. -- This message was sent by Atlassian Jira (v8.20.1#820001)