Hi,

Thanks for the report.. Logistics : Please subscribe to the dev ML, to be
able to send responses.
http://hudi.apache.org/community.html

Seems like we are unable to convert the json into a GenericRecord using the
supplied schema.
You can write a small program to first check if that json matches the
schema?

Also, curious about your changes around KafkaCluster and the other spark
streaming classes.
Mind throwing together a PR, so we can see if these can be fixed on main
source as well?

Thanks
Vinoth

On Fri, Mar 29, 2019 at 7:43 AM Netsanet Gebretsadkan <netsi22...@gmail.com>
wrote:

> I published a json file to kafka and run the hoodie delta streamer as a
> spark job with kafka as main data source. Since am using kafka 1.1 version
> i had to make changes to the kafka offset generation class and also the
> json kafka source class because Hudi is using  a depreciated class such as
> kafkacluster.  Now i can connect to kafka and consume some values but am
> facing one problem. that is :
>
>
> Can not instantiate value of type [map type; class java.util.LinkedHashMap,
> [simple type, class java.lang.Object] -> [simple type, class
> java.lang.Object]] from String value ('{"volume": 483951, "symbol": "MSFT",
> "ts": "2018-08-31 09:30:00", "month": "08", "high": 111.74, "low": 111.55,
> "key": "MSFT_2018-08-31 09", "year": 2018, "date": "2018/08/31", "close":
> 111.72, "open": 111.55, "day": "31"}
>
> '); no single-String constructor/factory method
>
>  at [Source: "{\"volume\": 483951, \"symbol\": \"MSFT\", \"ts\":
> \"2018-08-31 09:30:00\", \"month\": \"08\", \"high\": 111.74, \"low\":
> 111.55, \"key\": \"MSFT_2018-08-31 09\", \"year\": 2018, \"date\":
> \"2018/08/31\", \"close\": 111.72, \"open\": 111.55, \"day\": \"31\"}\n";
> line: 1, column: 1]
>
>     at
>
> com.uber.hoodie.avro.MercifulJsonConverter.convert(MercifulJsonConverter.java:51)
>
>     at
>
> com.uber.hoodie.utilities.sources.helpers.AvroConvertor.fromJson(AvroConvertor.java:86)
>
>
>   at
>
> org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040)
>
>     at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
>
>
>
> Would you please give me some ideas in how to solve this problem.
>
>
> The toRDD method in the JsonKafkaSource.java looks like below and am using
> StringDeserializer.
>
>
> private JavaRDD<String> toRDD(OffsetRange[] offsetRanges) {
>
>
>
>    return KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(),
> offsetRanges, LocationStrategies.PreferConsistent()).map( s ->
> s.value().toString());
>
>
>
>   }
>
>
> Thanks for your consideration and looking forward to hearing from you.
>

Reply via email to