Hi, Thanks for the report.. Logistics : Please subscribe to the dev ML, to be able to send responses. http://hudi.apache.org/community.html
Seems like we are unable to convert the json into a GenericRecord using the supplied schema. You can write a small program to first check if that json matches the schema? Also, curious about your changes around KafkaCluster and the other spark streaming classes. Mind throwing together a PR, so we can see if these can be fixed on main source as well? Thanks Vinoth On Fri, Mar 29, 2019 at 7:43 AM Netsanet Gebretsadkan <netsi22...@gmail.com> wrote: > I published a json file to kafka and run the hoodie delta streamer as a > spark job with kafka as main data source. Since am using kafka 1.1 version > i had to make changes to the kafka offset generation class and also the > json kafka source class because Hudi is using a depreciated class such as > kafkacluster. Now i can connect to kafka and consume some values but am > facing one problem. that is : > > > Can not instantiate value of type [map type; class java.util.LinkedHashMap, > [simple type, class java.lang.Object] -> [simple type, class > java.lang.Object]] from String value ('{"volume": 483951, "symbol": "MSFT", > "ts": "2018-08-31 09:30:00", "month": "08", "high": 111.74, "low": 111.55, > "key": "MSFT_2018-08-31 09", "year": 2018, "date": "2018/08/31", "close": > 111.72, "open": 111.55, "day": "31"} > > '); no single-String constructor/factory method > > at [Source: "{\"volume\": 483951, \"symbol\": \"MSFT\", \"ts\": > \"2018-08-31 09:30:00\", \"month\": \"08\", \"high\": 111.74, \"low\": > 111.55, \"key\": \"MSFT_2018-08-31 09\", \"year\": 2018, \"date\": > \"2018/08/31\", \"close\": 111.72, \"open\": 111.55, \"day\": \"31\"}\n"; > line: 1, column: 1] > > at > > com.uber.hoodie.avro.MercifulJsonConverter.convert(MercifulJsonConverter.java:51) > > at > > com.uber.hoodie.utilities.sources.helpers.AvroConvertor.fromJson(AvroConvertor.java:86) > > > at > > org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040) > > at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) > > > > Would you please give me some ideas in how to solve this problem. > > > The toRDD method in the JsonKafkaSource.java looks like below and am using > StringDeserializer. > > > private JavaRDD<String> toRDD(OffsetRange[] offsetRanges) { > > > > return KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(), > offsetRanges, LocationStrategies.PreferConsistent()).map( s -> > s.value().toString()); > > > > } > > > Thanks for your consideration and looking forward to hearing from you. >