[ https://issues.apache.org/jira/browse/HUDI-1864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Raymond Xu updated HUDI-1864: ----------------------------- Fix Version/s: 0.12.0 (was: 0.11.0) > Support for java.time.LocalDate in TimestampBasedAvroKeyGenerator > ----------------------------------------------------------------- > > Key: HUDI-1864 > URL: https://issues.apache.org/jira/browse/HUDI-1864 > Project: Apache Hudi > Issue Type: Improvement > Reporter: Vaibhav Sinha > Assignee: Vaibhav Sinha > Priority: Major > Labels: pull-request-available, query-eng, sev:high > Fix For: 0.12.0 > > > When we read data from MySQL which has a column of type {{Date}}, Spark > represents it as an instance of {{java.time.LocalDate}}. If I try and use > this column for partitioning while doing a write to Hudi, I get the following > exception > > {code:java} > Caused by: org.apache.hudi.exception.HoodieKeyGeneratorException: Unable to > parse input partition field :2021-04-21 > at > org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.getPartitionPath(TimestampBasedAvroKeyGenerator.java:136) > ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0] > at > org.apache.hudi.keygen.CustomAvroKeyGenerator.getPartitionPath(CustomAvroKeyGenerator.java:89) > ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0] > at > org.apache.hudi.keygen.CustomKeyGenerator.getPartitionPath(CustomKeyGenerator.java:64) > ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0] > at > org.apache.hudi.keygen.BaseKeyGenerator.getKey(BaseKeyGenerator.java:62) > ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0] > at > org.apache.hudi.HoodieSparkSqlWriter$.$anonfun$write$2(HoodieSparkSqlWriter.scala:160) > ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0] > at scala.collection.Iterator$$anon$10.next(Iterator.scala:459) > ~[scala-library-2.12.10.jar:?] > at scala.collection.Iterator$SliceIterator.next(Iterator.scala:271) > ~[scala-library-2.12.10.jar:?] > at scala.collection.Iterator.foreach(Iterator.scala:941) > ~[scala-library-2.12.10.jar:?] > at scala.collection.Iterator.foreach$(Iterator.scala:941) > ~[scala-library-2.12.10.jar:?] > at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) > ~[scala-library-2.12.10.jar:?] > at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62) > ~[scala-library-2.12.10.jar:?] > at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53) > ~[scala-library-2.12.10.jar:?] > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105) > ~[scala-library-2.12.10.jar:?] > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49) > ~[scala-library-2.12.10.jar:?] > at scala.collection.TraversableOnce.to(TraversableOnce.scala:315) > ~[scala-library-2.12.10.jar:?] > at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313) > ~[scala-library-2.12.10.jar:?] > at scala.collection.AbstractIterator.to(Iterator.scala:1429) > ~[scala-library-2.12.10.jar:?] > at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307) > ~[scala-library-2.12.10.jar:?] > at > scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307) > ~[scala-library-2.12.10.jar:?] > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1429) > ~[scala-library-2.12.10.jar:?] > at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294) > ~[scala-library-2.12.10.jar:?] > at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288) > ~[scala-library-2.12.10.jar:?] > at scala.collection.AbstractIterator.toArray(Iterator.scala:1429) > ~[scala-library-2.12.10.jar:?] > at org.apache.spark.rdd.RDD.$anonfun$take$2(RDD.scala:1449) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2242) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at org.apache.spark.scheduler.Task.run(Task.scala:131) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > ~[?:1.8.0_171] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > ~[?:1.8.0_171] > at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_171] > Caused by: org.apache.hudi.exception.HoodieNotSupportedException: Unexpected > type for partition field: java.time.LocalDate > at > org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.getPartitionPath(TimestampBasedAvroKeyGenerator.java:208) > ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0] > at > org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.getPartitionPath(TimestampBasedAvroKeyGenerator.java:134) > ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0] > at > org.apache.hudi.keygen.CustomAvroKeyGenerator.getPartitionPath(CustomAvroKeyGenerator.java:89) > ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0] > at > org.apache.hudi.keygen.CustomKeyGenerator.getPartitionPath(CustomKeyGenerator.java:64) > ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0] > at > org.apache.hudi.keygen.BaseKeyGenerator.getKey(BaseKeyGenerator.java:62) > ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0] > at > org.apache.hudi.HoodieSparkSqlWriter$.$anonfun$write$2(HoodieSparkSqlWriter.scala:160) > ~[hudi-spark3-bundle_2.12-0.8.0.jar:0.8.0] > at scala.collection.Iterator$$anon$10.next(Iterator.scala:459) > ~[scala-library-2.12.10.jar:?] > at scala.collection.Iterator$SliceIterator.next(Iterator.scala:271) > ~[scala-library-2.12.10.jar:?] > at scala.collection.Iterator.foreach(Iterator.scala:941) > ~[scala-library-2.12.10.jar:?] > at scala.collection.Iterator.foreach$(Iterator.scala:941) > ~[scala-library-2.12.10.jar:?] > at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) > ~[scala-library-2.12.10.jar:?] > at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62) > ~[scala-library-2.12.10.jar:?] > at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53) > ~[scala-library-2.12.10.jar:?] > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105) > ~[scala-library-2.12.10.jar:?] > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49) > ~[scala-library-2.12.10.jar:?] > at scala.collection.TraversableOnce.to(TraversableOnce.scala:315) > ~[scala-library-2.12.10.jar:?] > at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313) > ~[scala-library-2.12.10.jar:?] > at scala.collection.AbstractIterator.to(Iterator.scala:1429) > ~[scala-library-2.12.10.jar:?] > at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307) > ~[scala-library-2.12.10.jar:?] > at > scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307) > ~[scala-library-2.12.10.jar:?] > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1429) > ~[scala-library-2.12.10.jar:?] > at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294) > ~[scala-library-2.12.10.jar:?] > at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288) > ~[scala-library-2.12.10.jar:?] > at scala.collection.AbstractIterator.toArray(Iterator.scala:1429) > ~[scala-library-2.12.10.jar:?] > at org.apache.spark.rdd.RDD.$anonfun$take$2(RDD.scala:1449) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2242) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at org.apache.spark.scheduler.Task.run(Task.scala:131) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) > ~[spark-core_2.12-3.1.1.jar:3.1.1] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > ~[?:1.8.0_171] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > ~[?:1.8.0_171] > at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_171] > {code} > Currently, the only supported column types are > {code:java} > public String getPartitionPath(Object partitionVal) { > initIfNeeded(); > long timeMs; > if (partitionVal instanceof Double) { > timeMs = convertLongTimeToMillis(((Double) partitionVal).longValue()); > } else if (partitionVal instanceof Float) { > timeMs = convertLongTimeToMillis(((Float) partitionVal).longValue()); > } else if (partitionVal instanceof Long) { > timeMs = convertLongTimeToMillis((Long) partitionVal); > } else if (partitionVal instanceof CharSequence) { > if (!inputFormatter.isPresent()) { > throw new HoodieException("Missing inputformatter. Ensure " + > Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP + " config is set when timestampType > is DATE_STRING or MIXED!"); > } > DateTime parsedDateTime = > inputFormatter.get().parseDateTime(partitionVal.toString()); > if (this.outputDateTimeZone == null) { > // Use the timezone that came off the date that was passed in, if it > had one > partitionFormatter = > partitionFormatter.withZone(parsedDateTime.getZone()); > } timeMs = > inputFormatter.get().parseDateTime(partitionVal.toString()).getMillis(); > } else { > throw new HoodieNotSupportedException( > "Unexpected type for partition field: " + > partitionVal.getClass().getName()); > } > DateTime timestamp = new DateTime(timeMs, outputDateTimeZone); > String partitionPath = timestamp.toString(partitionFormatter); > if (encodePartitionPath) { > try { > partitionPath = URLEncoder.encode(partitionPath, > StandardCharsets.UTF_8.toString()); > } catch (UnsupportedEncodingException uoe) { > throw new HoodieException(uoe.getMessage(), uoe); > } > } > return hiveStylePartitioning ? getPartitionPathFields().get(0) + "=" + > partitionPath : partitionPath; > } > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)