[ https://issues.apache.org/jira/browse/HUDI-4123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Raymond Xu updated HUDI-4123: ----------------------------- Story Points: 1 > HoodieDeltaStreamer throws exception due to SqlSource return null checkpoint > ---------------------------------------------------------------------------- > > Key: HUDI-4123 > URL: https://issues.apache.org/jira/browse/HUDI-4123 > Project: Apache Hudi > Issue Type: Bug > Components: deltastreamer > Reporter: 董可伦 > Assignee: 董可伦 > Priority: Critical > Labels: pull-request-available > Fix For: 0.12.1 > > > When use SqlSource: > ## Create hive source table > ```sql > create database test location '/test'; > create table test.test_source ( > id int, > name string, > price double, > dt string, > ts bigint > ); > insert into test.test_source values (105,'hudi', 10.0,'2021-05-05',100); > ``` > ## Use SqlSource > sql_source.properties > ``` > hoodie.datasource.write.recordkey.field=id > hoodie.datasource.write.partitionpath.field=dt > hoodie.deltastreamer.source.sql.sql.query = select * from test.test_source > hoodie.datasource.hive_sync.table=test_hudi_target > hoodie.datasource.hive_sync.database=hudi > hoodie.datasource.hive_sync.partition_fields=dt > hoodie.datasource.hive_sync.create_managed_table = true > hoodie.datasource.write.hive_style_partitioning=true > hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator > hoodie.datasource.hive_sync.use_jdbc=false > hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor > ``` > ```bash > spark-submit --conf "spark.sql.catalogImplementation=hive" \ > --master yarn --deploy-mode client --executor-memory 2G --num-executors 3 > --executor-cores 2 --driver-memory 4G --driver-cores 2 \ > --principal spark/indata-10-110-105-163.indata....@indata.com --keytab > /etc/security/keytabs/spark.service.keytab \ > --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer > /usr/hdp/3.1.0.0-78/spark2/jars/hudi-utilities-bundle_2.11-0.12.0-SNAPSHOT.jar > \ > --props file:///opt/sql_source.properties \ > --target-base-path /hudi/test_hudi_target \ > --target-table test_hudi_target \ > --op BULK_INSERT \ > --table-type COPY_ON_WRITE \ > --source-ordering-field ts \ > --source-class org.apache.hudi.utilities.sources.SqlSource \ > --enable-sync \ > --checkpoint earliest \ > --allow-commit-on-no-checkpoint-change > ``` > Once executed, the hive source table can be successfully written to the Hudi > target table. > However, if it is executed multiple times, such as the second time, an > exception will be thrown: > ``` > org.apache.hudi.utilities.exception.HoodieDeltaStreamerException: Unable to > find previous checkpoint. Please double check if this table was indeed built > via delta streamer. Last Commit : > "deltastreamer.checkpoint.reset_key" : "earliest" > ``` > The reason is that the value of `deltastreamer.checkpoint.reset_key` is > `earliest`,but `deltastreamer.checkpoint.key` is null, > According to the logic of the method `getCheckpointToResume`,Will throw this > exception. > I think since the value of `deltastreamer.checkpoint.reset_key` is null, The > value of `deltastreamer.checkpoint.key`should also be saved as null.This also > avoids this exception according to the logic of the method > `getCheckpointToResume` > > > org.apache.hudi.utilities.exception.HoodieDeltaStreamerException: Unable to > find previous checkpoint. Please double check if this table was indeed built > via delta streamer. Last Commit > :Option{val=[20220519162403646__commit__COMPLETED]}, Instants > :[[20220519162403646__commit__COMPLETED]], CommitMetadata={ > "partitionToWriteStats" : { > "2016/03/15" : [ > { "fileId" : "6a1e0512-508a-4bdb-ad8f-200cda157ff0-0", "path" : > "2016/03/15/6a1e0512-508a-4bdb-ad8f-200cda157ff0-0_0-21-21_20220519162403646.parquet", > "prevCommit" : "null", "numWrites" : 342, "numDeletes" : > 0, "numUpdateWrites" : 0, "numInserts" : 342, > "totalWriteBytes" : 481336, "totalWriteErrors" : 0, "tempPath" : > null, "partitionPath" : "2016/03/15", "totalLogRecords" : 0, > "totalLogFilesCompacted" : 0, "totalLogSizeCompacted" : 0, > "totalUpdatedRecordsCompacted" : 0, "totalLogBlocks" : 0, > "totalCorruptLogBlock" : 0, "totalRollbackBlocks" : 0, > "fileSizeInBytes" : 481336, "minEventTime" : null, "maxEventTime" > : null } > ], > "2015/03/16" : [ > { "fileId" : "f3371308-8809-4644-baf6-c65c3fb86c8e-0", "path" : > "2015/03/16/f3371308-8809-4644-baf6-c65c3fb86c8e-0_1-21-22_20220519162403646.parquet", > "prevCommit" : "null", "numWrites" : 340, "numDeletes" : > 0, "numUpdateWrites" : 0, "numInserts" : 340, > "totalWriteBytes" : 481106, "totalWriteErrors" : 0, "tempPath" : > null, "partitionPath" : "2015/03/16", "totalLogRecords" : 0, > "totalLogFilesCompacted" : 0, "totalLogSizeCompacted" : 0, > "totalUpdatedRecordsCompacted" : 0, "totalLogBlocks" : 0, > "totalCorruptLogBlock" : 0, "totalRollbackBlocks" : 0, > "fileSizeInBytes" : 481106, "minEventTime" : null, "maxEventTime" > : null } > ], > "2015/03/17" : [ > { "fileId" : "672dde90-af93-4f05-8519-fea783295aa6-0", "path" : > "2015/03/17/672dde90-af93-4f05-8519-fea783295aa6-0_2-21-23_20220519162403646.parquet", > "prevCommit" : "null", "numWrites" : 318, "numDeletes" : > 0, "numUpdateWrites" : 0, "numInserts" : 318, > "totalWriteBytes" : 478586, "totalWriteErrors" : 0, "tempPath" : > null, "partitionPath" : "2015/03/17", "totalLogRecords" : 0, > "totalLogFilesCompacted" : 0, "totalLogSizeCompacted" : 0, > "totalUpdatedRecordsCompacted" : 0, "totalLogBlocks" : 0, > "totalCorruptLogBlock" : 0, "totalRollbackBlocks" : 0, > "fileSizeInBytes" : 478586, "minEventTime" : null, "maxEventTime" > : null } > ] > }, > "compacted" : false, > "extraMetadata" : { > "schema" : > "{\"type\":\"record\",\"name\":\"hoodie_source\",\"namespace\":\"hoodie.source\",\"fields\":[ > {\"name\":\"timestamp\",\"type\":[\"null\",\"long\"],\"default\":null} > ,{\"name\":\"_row_key\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"partition_path\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"rider\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"driver\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"begin_lat\",\"type\":[\"null\",\"double\"],\"default\":null},{\"name\":\"begin_lon\",\"type\":[\"null\",\"double\"],\"default\":null},{\"name\":\"end_lat\",\"type\":[\"null\",\"double\"],\"default\":null},{\"name\":\"end_lon\",\"type\":[\"null\",\"double\"],\"default\":null},{\"name\":\"distance_in_meters\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"seconds_since_epoch\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"weight\",\"type\":[\"null\",\"float\"],\"default\":null},{\"name\":\"nation\",\"type\":[\"null\",\"bytes\"],\"default\":null},{\"name\":\"current_date\",\"type\":[\"null\", > {\"type\":\"int\",\"logicalType\":\"date\"} > ],\"default\":null},{\"name\":\"current_ts\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"height\",\"type\":[\"null\", > {\"type\":\"fixed\",\"name\":\"fixed\",\"namespace\":\"hoodie.source.hoodie_source.height\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6} > ],\"default\":null},{\"name\":\"city_to_state\",\"type\":[\"null\", > {\"type\":\"map\",\"values\":[\"string\",\"null\"]} > ],\"default\":null},{\"name\":\"fare\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"fare\",\"namespace\":\"hoodie.source.hoodie_source\",\"fields\":[ > {\"name\":\"amount\",\"type\":[\"null\",\"double\"],\"default\":null} > ,{\"name\":\"currency\",\"type\":[\"null\",\"string\"],\"default\":null}]}],\"default\":null},{\"name\":\"tip_history\",\"type\":[\"null\",{\"type\":\"array\",\"items\":[{\"type\":\"record\",\"name\":\"tip_history\",\"namespace\":\"hoodie.source.hoodie_source\",\"fields\":[ > {\"name\":\"amount\",\"type\":[\"null\",\"double\"],\"default\":null} > ,{\"name\":\"currency\",\"type\":[\"null\",\"string\"],\"default\":null}]},\"null\"]}],\"default\":null},{\"name\":\"_hoodie_is_deleted\",\"type\":[\"null\",\"boolean\"],\"default\":null}]}", > "deltastreamer.checkpoint.reset_key" : "earliest" > }, > "operationType" : "INSERT", > "fileIdAndRelativePaths" : > { "672dde90-af93-4f05-8519-fea783295aa6-0" : > "2015/03/17/672dde90-af93-4f05-8519-fea783295aa6-0_2-21-23_20220519162403646.parquet", > "6a1e0512-508a-4bdb-ad8f-200cda157ff0-0" : > "2016/03/15/6a1e0512-508a-4bdb-ad8f-200cda157ff0-0_0-21-21_20220519162403646.parquet", > "f3371308-8809-4644-baf6-c65c3fb86c8e-0" : > "2015/03/16/f3371308-8809-4644-baf6-c65c3fb86c8e-0_1-21-22_20220519162403646.parquet" > } > , > "writePartitionPaths" : [ "2016/03/15", "2015/03/16", "2015/03/17" ], > "totalLogRecordsCompacted" : 0, > "totalLogFilesSize" : 0, > "totalLogFilesCompacted" : 0, > "totalUpsertTime" : 0, > "totalCreateTime" : 0, > "minAndMaxEventTime" : { > "Optional.empty" : > { "val" : null, "present" : false } > }, > "totalRecordsDeleted" : 0, > "totalCompactedRecordsUpdated" : 0, > "totalScanTime" : 0, > "writeStats" : [ > { "fileId" : "6a1e0512-508a-4bdb-ad8f-200cda157ff0-0", "path" : > "2016/03/15/6a1e0512-508a-4bdb-ad8f-200cda157ff0-0_0-21-21_20220519162403646.parquet", > "prevCommit" : "null", "numWrites" : 342, "numDeletes" : 0, > "numUpdateWrites" : 0, "numInserts" : 342, "totalWriteBytes" : > 481336, "totalWriteErrors" : 0, "tempPath" : null, > "partitionPath" : "2016/03/15", "totalLogRecords" : 0, > "totalLogFilesCompacted" : 0, "totalLogSizeCompacted" : 0, > "totalUpdatedRecordsCompacted" : 0, "totalLogBlocks" : 0, > "totalCorruptLogBlock" : 0, "totalRollbackBlocks" : 0, > "fileSizeInBytes" : 481336, "minEventTime" : null, "maxEventTime" : > null } > , > { "fileId" : "f3371308-8809-4644-baf6-c65c3fb86c8e-0", "path" : > "2015/03/16/f3371308-8809-4644-baf6-c65c3fb86c8e-0_1-21-22_20220519162403646.parquet", > "prevCommit" : "null", "numWrites" : 340, "numDeletes" : 0, > "numUpdateWrites" : 0, "numInserts" : 340, "totalWriteBytes" : > 481106, "totalWriteErrors" : 0, "tempPath" : null, > "partitionPath" : "2015/03/16", "totalLogRecords" : 0, > "totalLogFilesCompacted" : 0, "totalLogSizeCompacted" : 0, > "totalUpdatedRecordsCompacted" : 0, "totalLogBlocks" : 0, > "totalCorruptLogBlock" : 0, "totalRollbackBlocks" : 0, > "fileSizeInBytes" : 481106, "minEventTime" : null, "maxEventTime" : > null } > , > { "fileId" : "672dde90-af93-4f05-8519-fea783295aa6-0", "path" : > "2015/03/17/672dde90-af93-4f05-8519-fea783295aa6-0_2-21-23_20220519162403646.parquet", > "prevCommit" : "null", "numWrites" : 318, "numDeletes" : 0, > "numUpdateWrites" : 0, "numInserts" : 318, "totalWriteBytes" : > 478586, "totalWriteErrors" : 0, "tempPath" : null, > "partitionPath" : "2015/03/17", "totalLogRecords" : 0, > "totalLogFilesCompacted" : 0, "totalLogSizeCompacted" : 0, > "totalUpdatedRecordsCompacted" : 0, "totalLogBlocks" : 0, > "totalCorruptLogBlock" : 0, "totalRollbackBlocks" : 0, > "fileSizeInBytes" : 478586, "minEventTime" : null, "maxEventTime" : > null } > ] > } > at > org.apache.hudi.utilities.deltastreamer.DeltaSync.getCheckpointToResume(DeltaSync.java:527) > at > org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:364) > at > org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:302) > at > org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$2(HoodieDeltaStreamer.java:201) > at org.apache.hudi.common.util.Option.ifPresent(Option.java:97) > at > org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:199) > at > org.apache.hudi.utilities.functional.TestHoodieDeltaStreamer.testSqlSourceSource(TestHoodieDeltaStreamer.java:1962) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688) > at > org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) > at > org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149) > at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140) > at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84) > at > org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115) > at > org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) > at > org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104) > at > org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:212) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:208) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:137) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:71) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129) > at > org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84) > at java.util.ArrayList.forEach(ArrayList.java:1257) > at > org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129) > at > org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84) > at java.util.ArrayList.forEach(ArrayList.java:1257) > at > org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129) > at > org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84) > at > org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32) > at > org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57) > at > org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51) > at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107) > at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:87) > at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:53) > at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:66) > at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:51) > at > org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:87) > at > org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:66) > at > com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:71) > at > com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38) > at > com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11) > at > com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35) > at > com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235) > at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54) -- This message was sent by Atlassian Jira (v8.20.10#820010)