[ 
https://issues.apache.org/jira/browse/HUDI-1770?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vinoth Govindarajan updated HUDI-1770:
--------------------------------------
    Affects Version/s: 0.8.0

> Deltastreamer throws errors when not running frequently
> -------------------------------------------------------
>
>                 Key: HUDI-1770
>                 URL: https://issues.apache.org/jira/browse/HUDI-1770
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: DeltaStreamer
>    Affects Versions: 0.8.0
>            Reporter: Vinoth Govindarajan
>            Priority: Major
>
> When delta streamer is using HoodieIncrSource from another parent Hudi table, 
> it runs into this error, when you are not running your delta streamer 
> pipeline frequently.
>  
> {code:java}
> User class threw exception: org.apache.spark.sql.AnalysisException: Path does 
> not exist: 
> hdfs:///tmp/delta_streamer_test/datestr=2021-03-30/f64f3420-4e03-4835-ab06-5d73cb953aa9-0_3-4-91_20210402163524.parquet;
>       at 
> org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:558)
>       at 
> org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:545)
>       at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>       at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>       at scala.collection.immutable.List.foreach(List.scala:392)
>       at 
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
>       at scala.collection.immutable.List.flatMap(List.scala:355)
>       at 
> org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:545)
>       at 
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:359)
>       at 
> org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
>       at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
>       at 
> org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:641)
>       at 
> org.apache.hudi.IncrementalRelation.buildScan(IncrementalRelation.scala:151)
>       at 
> org.apache.spark.sql.execution.datasources.DataSourceStrategy.apply(DataSourceStrategy.scala:306)
>       at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
>       at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
>       at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
>       at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
>       at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
>       at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
>       at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
>       at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
>       at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>       at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>       at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>       at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1334)
>       at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
>       at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
>       at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
>       at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
>       at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
>       at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
>       at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
>       at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>       at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>       at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>       at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1334)
>       at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
>       at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
>       at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
>       at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
>       at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
>       at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
>       at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
>       at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>       at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>       at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>       at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1334)
>       at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
>       at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
>       at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
>       at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
>       at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
>       at 
> org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:79)
>       at 
> org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:75)
>       at 
> org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:85)
>       at 
> org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:84)
>       at 
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:106)
>       at 
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:106)
>       at 
> org.apache.hudi.HoodieSparkUtils$.createRdd(HoodieSparkUtils.scala:104)
>       at 
> org.apache.hudi.HoodieSparkUtils$.createRdd(HoodieSparkUtils.scala:95)
>       at org.apache.hudi.HoodieSparkUtils.createRdd(HoodieSparkUtils.scala)
>       at 
> org.apache.hudi.utilities.deltastreamer.DeltaSync.lambda$readFromSource$4(DeltaSync.java:380)
>       at org.apache.hudi.common.util.Option.map(Option.java:107)
>       at 
> org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:380)
>       at 
> org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:263)
>       at 
> org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$2(HoodieDeltaStreamer.java:170)
>       at org.apache.hudi.common.util.Option.ifPresent(Option.java:96)
>       at 
> org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:168)
>       at 
> org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:470)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:690)
> {code}
>  
> I guess this is because of the inconsistency with the cleaner commit retained 
> and archival process which cleans up the commit files 
> ([source|https://github.com/apache/hudi/blob/fe16d0de7c76105775c887b700751241bc82624c/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java#L123]):
> {code:java}
> private static final String DEFAULT_CLEANER_FILE_VERSIONS_RETAINED = "3";
> private static final String DEFAULT_CLEANER_COMMITS_RETAINED = "10";
> private static final String DEFAULT_MAX_COMMITS_TO_KEEP = "30";
> private static final String DEFAULT_MIN_COMMITS_TO_KEEP = "20";
> {code}
>  
> These file not found error is for the commits which are present in .hoodie 
> folder but the actual version is cleaned by the cleaner, delta streamer is 
> doing a listing of all the commits and selects the latest commit file, but 
> when the actual committed version file is not there, it throws this error.
>  
> To recreate this error, you need to read from a hoodie table with 
> HoodieIncrSource and schedule your pipeline to run twice a day (provided the 
> cleaner cleans the commits in that interval).
>  
> *Possible Solution:*
> Merge these two configs into one, and avoid this kind of inconsistent state 
> in the HDFS.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to