Kengo Seki created CAMEL-19586:
----------------------------------

             Summary: camel-parquet-avro - Allow users to unmarshal Parquet 
file into Avro's GenericRecords
                 Key: CAMEL-19586
                 URL: https://issues.apache.org/jira/browse/CAMEL-19586
             Project: Camel
          Issue Type: Improvement
            Reporter: Kengo Seki
            Assignee: Kengo Seki


Currently, the Parquet-Avro data format requires users to define a POJO class 
for marshalling and unmarshalling. It's a bit bothering especially for 
unmarshalling an existing Parquet file with a complicated data structure.

[Avro provides GenericRecord in a such 
case|https://avro.apache.org/docs/1.11.1/getting-started-java/#serializing-and-deserializing-without-code-generation],
 but it doesn't work with the current unmarshaller for now, as follows:

{code}
$ cat example.java
///usr/bin/env jbang "$0" "$@" ; exit $?
//DEPS org.slf4j:slf4j-simple:1.7.31
//DEPS org.apache.camel:camel-bom:4.0.0-M3@pom
//DEPS org.apache.camel:camel-core
//DEPS org.apache.camel:camel-main
//DEPS org.apache.camel:camel-parquet-avro:4.0.0-SNAPSHOT
//DEPS org.apache.hadoop:hadoop-client:3.3.6

import org.apache.avro.generic.GenericRecord;
import org.apache.camel.*;
import org.apache.camel.builder.*;
import org.apache.camel.dataformat.parquet.avro.*;
import org.apache.camel.main.*;
import org.apache.camel.spi.*;
import static org.apache.camel.builder.PredicateBuilder.*;

class example {

    public static void main(String... args) throws Exception {
        System.setProperty("org.slf4j.simpleLogger.logFile", "System.out");

        Main main = new Main();
        ParquetAvroDataFormat format = new ParquetAvroDataFormat();
        format.setUnmarshalType(GenericRecord.class);
        main.configure().addRoutesBuilder(new RouteBuilder() {
            public void configure() throws Exception {
                from("file:/tmp?fileName=example1.parquet&noop=true")
                    .unmarshal(format)
                    .marshal(format)
                    .log("${body}");
            }
        });
        main.run();
    }
}
$ jbang example.java

...

[Camel (camel-1) thread #1 - file:///tmp] ERROR 
org.apache.camel.processor.errorhandler.DefaultErrorHandler - Failed delivery 
for (MessageId: 356E1287483C55C-0000000000000000 on ExchangeId: 
356E1287483C55C-0000000000000000). Exhausted after delivery attempt: 1 caught: 
org.apache.avro.AvroRuntimeException: Not a Specific class: interface 
org.apache.avro.generic.GenericRecord

Message History (source location and message history is disabled)
---------------------------------------------------------------------------------------------------------------------------------------
Source                                   ID                             
Processor                                          Elapsed (ms)
                                         route1/route1                  
from[file:///tmp?fileName=example1.parquet&noop=tr     89591203
        ...
                                         route1/marshal1                
marshal[org.apache.camel.model.DataFormatDefinitio            0

Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
org.apache.avro.AvroRuntimeException: Not a Specific class: interface 
org.apache.avro.generic.GenericRecord
        at 
org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:403)
        at 
org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:734)
        at 
org.apache.avro.specific.SpecificData.lambda$new$3(SpecificData.java:337)
        at 
org.apache.avro.util.internal.ClassValueCache$1.computeValue(ClassValueCache.java:35)
        at java.base/java.lang.ClassValue.getFromHashMap(ClassValue.java:228)
        at java.base/java.lang.ClassValue.getFromBackup(ClassValue.java:210)
        at java.base/java.lang.ClassValue.get(ClassValue.java:116)
        at 
org.apache.avro.util.internal.ClassValueCache.apply(ClassValueCache.java:45)
        at 
org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:346)
        at 
org.apache.camel.dataformat.parquet.avro.ParquetAvroDataFormat.marshal(ParquetAvroDataFormat.java:70)
        at 
org.apache.camel.support.processor.MarshalProcessor.process(MarshalProcessor.java:64)
        at 
org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:475)
        at 
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:181)
        at 
org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59)
        at org.apache.camel.processor.Pipeline.process(Pipeline.java:164)
        at 
org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:379)
        at 
org.apache.camel.component.file.GenericFileConsumer.processExchange(GenericFileConsumer.java:491)
        at 
org.apache.camel.component.file.GenericFileConsumer.processBatch(GenericFileConsumer.java:244)
        at 
org.apache.camel.component.file.GenericFileConsumer.poll(GenericFileConsumer.java:205)
        at 
org.apache.camel.support.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:202)
        at 
org.apache.camel.support.ScheduledPollConsumer.run(ScheduledPollConsumer.java:116)
        at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
        at 
java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
        at 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:833)
{code}

So I'd like to propose a new feature to unmarshal Parquet data into Avro's 
GenericRecord (and vice versa) if POJO is not specified as unmarshalType.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to