[ https://issues.apache.org/jira/browse/HUDI-1770?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Vinoth Govindarajan updated HUDI-1770: -------------------------------------- Affects Version/s: 0.8.0 > Deltastreamer throws errors when not running frequently > ------------------------------------------------------- > > Key: HUDI-1770 > URL: https://issues.apache.org/jira/browse/HUDI-1770 > Project: Apache Hudi > Issue Type: Bug > Components: DeltaStreamer > Affects Versions: 0.8.0 > Reporter: Vinoth Govindarajan > Priority: Major > > When delta streamer is using HoodieIncrSource from another parent Hudi table, > it runs into this error, when you are not running your delta streamer > pipeline frequently. > > {code:java} > User class threw exception: org.apache.spark.sql.AnalysisException: Path does > not exist: > hdfs:///tmp/delta_streamer_test/datestr=2021-03-30/f64f3420-4e03-4835-ab06-5d73cb953aa9-0_3-4-91_20210402163524.parquet; > at > org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:558) > at > org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:545) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at scala.collection.immutable.List.foreach(List.scala:392) > at > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) > at scala.collection.immutable.List.flatMap(List.scala:355) > at > org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:545) > at > org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:359) > at > org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223) > at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211) > at > org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:641) > at > org.apache.hudi.IncrementalRelation.buildScan(IncrementalRelation.scala:151) > at > org.apache.spark.sql.execution.datasources.DataSourceStrategy.apply(DataSourceStrategy.scala:306) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63) > at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) > at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1334) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67) > at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) > at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1334) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67) > at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) > at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1334) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67) > at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93) > at > org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:79) > at > org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:75) > at > org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:85) > at > org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:84) > at > org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:106) > at > org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:106) > at > org.apache.hudi.HoodieSparkUtils$.createRdd(HoodieSparkUtils.scala:104) > at > org.apache.hudi.HoodieSparkUtils$.createRdd(HoodieSparkUtils.scala:95) > at org.apache.hudi.HoodieSparkUtils.createRdd(HoodieSparkUtils.scala) > at > org.apache.hudi.utilities.deltastreamer.DeltaSync.lambda$readFromSource$4(DeltaSync.java:380) > at org.apache.hudi.common.util.Option.map(Option.java:107) > at > org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:380) > at > org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:263) > at > org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$2(HoodieDeltaStreamer.java:170) > at org.apache.hudi.common.util.Option.ifPresent(Option.java:96) > at > org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:168) > at > org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:470) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:690) > {code} > > I guess this is because of the inconsistency with the cleaner commit retained > and archival process which cleans up the commit files > ([source|https://github.com/apache/hudi/blob/fe16d0de7c76105775c887b700751241bc82624c/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java#L123]): > {code:java} > private static final String DEFAULT_CLEANER_FILE_VERSIONS_RETAINED = "3"; > private static final String DEFAULT_CLEANER_COMMITS_RETAINED = "10"; > private static final String DEFAULT_MAX_COMMITS_TO_KEEP = "30"; > private static final String DEFAULT_MIN_COMMITS_TO_KEEP = "20"; > {code} > > These file not found error is for the commits which are present in .hoodie > folder but the actual version is cleaned by the cleaner, delta streamer is > doing a listing of all the commits and selects the latest commit file, but > when the actual committed version file is not there, it throws this error. > > To recreate this error, you need to read from a hoodie table with > HoodieIncrSource and schedule your pipeline to run twice a day (provided the > cleaner cleans the commits in that interval). > > *Possible Solution:* > Merge these two configs into one, and avoid this kind of inconsistent state > in the HDFS. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)