[ 
https://issues.apache.org/jira/browse/HUDI-4123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raymond Xu updated HUDI-4123:
-----------------------------
    Story Points: 1

> HoodieDeltaStreamer throws exception due to SqlSource return null checkpoint
> ----------------------------------------------------------------------------
>
>                 Key: HUDI-4123
>                 URL: https://issues.apache.org/jira/browse/HUDI-4123
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: deltastreamer
>            Reporter: 董可伦
>            Assignee: 董可伦
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 0.12.1
>
>
> When use SqlSource:
> ## Create hive source table
> ```sql
> create database test location '/test';
> create table test.test_source (
>   id int,
>   name string,
>   price double,
>   dt string,
>   ts bigint
> );
> insert into test.test_source values (105,'hudi', 10.0,'2021-05-05',100);
> ```
> ## Use SqlSource
> sql_source.properties
> ```
> hoodie.datasource.write.recordkey.field=id
> hoodie.datasource.write.partitionpath.field=dt
> hoodie.deltastreamer.source.sql.sql.query = select * from test.test_source
> hoodie.datasource.hive_sync.table=test_hudi_target
> hoodie.datasource.hive_sync.database=hudi
> hoodie.datasource.hive_sync.partition_fields=dt
> hoodie.datasource.hive_sync.create_managed_table = true
> hoodie.datasource.write.hive_style_partitioning=true
> hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator
> hoodie.datasource.hive_sync.use_jdbc=false
> hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor
> ```
> ```bash
> spark-submit --conf "spark.sql.catalogImplementation=hive" \
> --master yarn --deploy-mode client --executor-memory 2G --num-executors 3 
> --executor-cores 2 --driver-memory 4G --driver-cores 2 \
> --principal spark/indata-10-110-105-163.indata....@indata.com --keytab 
> /etc/security/keytabs/spark.service.keytab \
> --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer 
> /usr/hdp/3.1.0.0-78/spark2/jars/hudi-utilities-bundle_2.11-0.12.0-SNAPSHOT.jar
>  \
> --props file:///opt/sql_source.properties  \
> --target-base-path /hudi/test_hudi_target \
> --target-table test_hudi_target \
> --op BULK_INSERT \
> --table-type COPY_ON_WRITE \
> --source-ordering-field ts \
> --source-class org.apache.hudi.utilities.sources.SqlSource \
> --enable-sync  \
> --checkpoint earliest \
> --allow-commit-on-no-checkpoint-change
> ```
> Once executed, the hive source table can be successfully written to the Hudi 
> target table.
> However, if it is executed multiple times, such as the second time, an 
> exception will be thrown:
> ```
> org.apache.hudi.utilities.exception.HoodieDeltaStreamerException: Unable to 
> find previous checkpoint. Please double check if this table was indeed built 
> via delta streamer. Last Commit :
> "deltastreamer.checkpoint.reset_key" : "earliest"
> ```
> The reason is that the value of `deltastreamer.checkpoint.reset_key` is 
> `earliest`,but `deltastreamer.checkpoint.key` is null,
> According to the logic of the method `getCheckpointToResume`,Will throw this 
> exception.
> I think since  the value of `deltastreamer.checkpoint.reset_key` is null, The 
> value of `deltastreamer.checkpoint.key`should also be saved as null.This also 
> avoids this exception according to the logic of the method 
> `getCheckpointToResume`
>  
>  
> org.apache.hudi.utilities.exception.HoodieDeltaStreamerException: Unable to 
> find previous checkpoint. Please double check if this table was indeed built 
> via delta streamer. Last Commit 
> :Option{val=[20220519162403646__commit__COMPLETED]}, Instants 
> :[[20220519162403646__commit__COMPLETED]], CommitMetadata={
>   "partitionToWriteStats" : {
>     "2016/03/15" : [
> {       "fileId" : "6a1e0512-508a-4bdb-ad8f-200cda157ff0-0",       "path" : 
> "2016/03/15/6a1e0512-508a-4bdb-ad8f-200cda157ff0-0_0-21-21_20220519162403646.parquet",
>        "prevCommit" : "null",       "numWrites" : 342,       "numDeletes" : 
> 0,       "numUpdateWrites" : 0,       "numInserts" : 342,       
> "totalWriteBytes" : 481336,       "totalWriteErrors" : 0,       "tempPath" : 
> null,       "partitionPath" : "2016/03/15",       "totalLogRecords" : 0,      
>  "totalLogFilesCompacted" : 0,       "totalLogSizeCompacted" : 0,       
> "totalUpdatedRecordsCompacted" : 0,       "totalLogBlocks" : 0,       
> "totalCorruptLogBlock" : 0,       "totalRollbackBlocks" : 0,       
> "fileSizeInBytes" : 481336,       "minEventTime" : null,       "maxEventTime" 
> : null     }
> ],
>     "2015/03/16" : [
> {       "fileId" : "f3371308-8809-4644-baf6-c65c3fb86c8e-0",       "path" : 
> "2015/03/16/f3371308-8809-4644-baf6-c65c3fb86c8e-0_1-21-22_20220519162403646.parquet",
>        "prevCommit" : "null",       "numWrites" : 340,       "numDeletes" : 
> 0,       "numUpdateWrites" : 0,       "numInserts" : 340,       
> "totalWriteBytes" : 481106,       "totalWriteErrors" : 0,       "tempPath" : 
> null,       "partitionPath" : "2015/03/16",       "totalLogRecords" : 0,      
>  "totalLogFilesCompacted" : 0,       "totalLogSizeCompacted" : 0,       
> "totalUpdatedRecordsCompacted" : 0,       "totalLogBlocks" : 0,       
> "totalCorruptLogBlock" : 0,       "totalRollbackBlocks" : 0,       
> "fileSizeInBytes" : 481106,       "minEventTime" : null,       "maxEventTime" 
> : null     }
> ],
>     "2015/03/17" : [
> {       "fileId" : "672dde90-af93-4f05-8519-fea783295aa6-0",       "path" : 
> "2015/03/17/672dde90-af93-4f05-8519-fea783295aa6-0_2-21-23_20220519162403646.parquet",
>        "prevCommit" : "null",       "numWrites" : 318,       "numDeletes" : 
> 0,       "numUpdateWrites" : 0,       "numInserts" : 318,       
> "totalWriteBytes" : 478586,       "totalWriteErrors" : 0,       "tempPath" : 
> null,       "partitionPath" : "2015/03/17",       "totalLogRecords" : 0,      
>  "totalLogFilesCompacted" : 0,       "totalLogSizeCompacted" : 0,       
> "totalUpdatedRecordsCompacted" : 0,       "totalLogBlocks" : 0,       
> "totalCorruptLogBlock" : 0,       "totalRollbackBlocks" : 0,       
> "fileSizeInBytes" : 478586,       "minEventTime" : null,       "maxEventTime" 
> : null     }
> ]
>   },
>   "compacted" : false,
>   "extraMetadata" : {
>     "schema" : 
> "{\"type\":\"record\",\"name\":\"hoodie_source\",\"namespace\":\"hoodie.source\",\"fields\":[
> {\"name\":\"timestamp\",\"type\":[\"null\",\"long\"],\"default\":null}
> ,{\"name\":\"_row_key\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"partition_path\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"rider\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"driver\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"begin_lat\",\"type\":[\"null\",\"double\"],\"default\":null},{\"name\":\"begin_lon\",\"type\":[\"null\",\"double\"],\"default\":null},{\"name\":\"end_lat\",\"type\":[\"null\",\"double\"],\"default\":null},{\"name\":\"end_lon\",\"type\":[\"null\",\"double\"],\"default\":null},{\"name\":\"distance_in_meters\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"seconds_since_epoch\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"weight\",\"type\":[\"null\",\"float\"],\"default\":null},{\"name\":\"nation\",\"type\":[\"null\",\"bytes\"],\"default\":null},{\"name\":\"current_date\",\"type\":[\"null\",
> {\"type\":\"int\",\"logicalType\":\"date\"}
> ],\"default\":null},{\"name\":\"current_ts\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"height\",\"type\":[\"null\",
> {\"type\":\"fixed\",\"name\":\"fixed\",\"namespace\":\"hoodie.source.hoodie_source.height\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}
> ],\"default\":null},{\"name\":\"city_to_state\",\"type\":[\"null\",
> {\"type\":\"map\",\"values\":[\"string\",\"null\"]}
> ],\"default\":null},{\"name\":\"fare\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"fare\",\"namespace\":\"hoodie.source.hoodie_source\",\"fields\":[
> {\"name\":\"amount\",\"type\":[\"null\",\"double\"],\"default\":null}
> ,{\"name\":\"currency\",\"type\":[\"null\",\"string\"],\"default\":null}]}],\"default\":null},{\"name\":\"tip_history\",\"type\":[\"null\",{\"type\":\"array\",\"items\":[{\"type\":\"record\",\"name\":\"tip_history\",\"namespace\":\"hoodie.source.hoodie_source\",\"fields\":[
> {\"name\":\"amount\",\"type\":[\"null\",\"double\"],\"default\":null}
> ,{\"name\":\"currency\",\"type\":[\"null\",\"string\"],\"default\":null}]},\"null\"]}],\"default\":null},{\"name\":\"_hoodie_is_deleted\",\"type\":[\"null\",\"boolean\"],\"default\":null}]}",
>     "deltastreamer.checkpoint.reset_key" : "earliest"
>   },
>   "operationType" : "INSERT",
>   "fileIdAndRelativePaths" :
> {     "672dde90-af93-4f05-8519-fea783295aa6-0" : 
> "2015/03/17/672dde90-af93-4f05-8519-fea783295aa6-0_2-21-23_20220519162403646.parquet",
>      "6a1e0512-508a-4bdb-ad8f-200cda157ff0-0" : 
> "2016/03/15/6a1e0512-508a-4bdb-ad8f-200cda157ff0-0_0-21-21_20220519162403646.parquet",
>      "f3371308-8809-4644-baf6-c65c3fb86c8e-0" : 
> "2015/03/16/f3371308-8809-4644-baf6-c65c3fb86c8e-0_1-21-22_20220519162403646.parquet"
>    }
> ,
>   "writePartitionPaths" : [ "2016/03/15", "2015/03/16", "2015/03/17" ],
>   "totalLogRecordsCompacted" : 0,
>   "totalLogFilesSize" : 0,
>   "totalLogFilesCompacted" : 0,
>   "totalUpsertTime" : 0,
>   "totalCreateTime" : 0,
>   "minAndMaxEventTime" : {
>     "Optional.empty" :
> {       "val" : null,       "present" : false     }
>   },
>   "totalRecordsDeleted" : 0,
>   "totalCompactedRecordsUpdated" : 0,
>   "totalScanTime" : 0,
>   "writeStats" : [
> {     "fileId" : "6a1e0512-508a-4bdb-ad8f-200cda157ff0-0",     "path" : 
> "2016/03/15/6a1e0512-508a-4bdb-ad8f-200cda157ff0-0_0-21-21_20220519162403646.parquet",
>      "prevCommit" : "null",     "numWrites" : 342,     "numDeletes" : 0,     
> "numUpdateWrites" : 0,     "numInserts" : 342,     "totalWriteBytes" : 
> 481336,     "totalWriteErrors" : 0,     "tempPath" : null,     
> "partitionPath" : "2016/03/15",     "totalLogRecords" : 0,     
> "totalLogFilesCompacted" : 0,     "totalLogSizeCompacted" : 0,     
> "totalUpdatedRecordsCompacted" : 0,     "totalLogBlocks" : 0,     
> "totalCorruptLogBlock" : 0,     "totalRollbackBlocks" : 0,     
> "fileSizeInBytes" : 481336,     "minEventTime" : null,     "maxEventTime" : 
> null   }
> ,
> {     "fileId" : "f3371308-8809-4644-baf6-c65c3fb86c8e-0",     "path" : 
> "2015/03/16/f3371308-8809-4644-baf6-c65c3fb86c8e-0_1-21-22_20220519162403646.parquet",
>      "prevCommit" : "null",     "numWrites" : 340,     "numDeletes" : 0,     
> "numUpdateWrites" : 0,     "numInserts" : 340,     "totalWriteBytes" : 
> 481106,     "totalWriteErrors" : 0,     "tempPath" : null,     
> "partitionPath" : "2015/03/16",     "totalLogRecords" : 0,     
> "totalLogFilesCompacted" : 0,     "totalLogSizeCompacted" : 0,     
> "totalUpdatedRecordsCompacted" : 0,     "totalLogBlocks" : 0,     
> "totalCorruptLogBlock" : 0,     "totalRollbackBlocks" : 0,     
> "fileSizeInBytes" : 481106,     "minEventTime" : null,     "maxEventTime" : 
> null   }
> ,
> {     "fileId" : "672dde90-af93-4f05-8519-fea783295aa6-0",     "path" : 
> "2015/03/17/672dde90-af93-4f05-8519-fea783295aa6-0_2-21-23_20220519162403646.parquet",
>      "prevCommit" : "null",     "numWrites" : 318,     "numDeletes" : 0,     
> "numUpdateWrites" : 0,     "numInserts" : 318,     "totalWriteBytes" : 
> 478586,     "totalWriteErrors" : 0,     "tempPath" : null,     
> "partitionPath" : "2015/03/17",     "totalLogRecords" : 0,     
> "totalLogFilesCompacted" : 0,     "totalLogSizeCompacted" : 0,     
> "totalUpdatedRecordsCompacted" : 0,     "totalLogBlocks" : 0,     
> "totalCorruptLogBlock" : 0,     "totalRollbackBlocks" : 0,     
> "fileSizeInBytes" : 478586,     "minEventTime" : null,     "maxEventTime" : 
> null   }
> ]
> }
>     at 
> org.apache.hudi.utilities.deltastreamer.DeltaSync.getCheckpointToResume(DeltaSync.java:527)
>     at 
> org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:364)
>     at 
> org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:302)
>     at 
> org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$2(HoodieDeltaStreamer.java:201)
>     at org.apache.hudi.common.util.Option.ifPresent(Option.java:97)
>     at 
> org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:199)
>     at 
> org.apache.hudi.utilities.functional.TestHoodieDeltaStreamer.testSqlSourceSource(TestHoodieDeltaStreamer.java:1962)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
>     at 
> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
>     at 
> org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
>     at 
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
>     at 
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
>     at 
> org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
>     at 
> org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
>     at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
>     at 
> org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
>     at 
> org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
>     at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:212)
>     at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>     at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:208)
>     at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:137)
>     at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:71)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
>     at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
>     at 
> org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
>     at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
>     at java.util.ArrayList.forEach(ArrayList.java:1257)
>     at 
> org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
>     at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
>     at 
> org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
>     at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
>     at java.util.ArrayList.forEach(ArrayList.java:1257)
>     at 
> org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
>     at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
>     at 
> org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
>     at 
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
>     at 
> org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
>     at 
> org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
>     at 
> org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
>     at 
> org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
>     at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
>     at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:87)
>     at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:53)
>     at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:66)
>     at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:51)
>     at 
> org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:87)
>     at 
> org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:66)
>     at 
> com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:71)
>     at 
> com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
>     at 
> com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
>     at 
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
>     at 
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
>     at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to