[ https://issues.apache.org/jira/browse/KAFKA-6290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Randall Hauch resolved KAFKA-6290. ---------------------------------- Fix Version/s: 2.3.1 2.4.0 2.2.2 2.1.2 2.0.2 1.1.2 Reviewer: Randall Hauch Resolution: Fixed Merged back to the `1.1` branch > Kafka Connect cast transformation should support logical types > -------------------------------------------------------------- > > Key: KAFKA-6290 > URL: https://issues.apache.org/jira/browse/KAFKA-6290 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect > Affects Versions: 0.11.0.0 > Reporter: Sudhir Pradhan > Assignee: Nigel Liang > Priority: Major > Labels: confluent-kafka, confluentic, connect, connect-api, > connect-transformation, kafka, kafka-connect, transform > Fix For: 1.1.2, 2.0.2, 2.1.2, 2.2.2, 2.4.0, 2.3.1 > > > I am facing same issue when consuming from KAFKA to HDFS with CAST > TRANSFORMS. Any pointer please. > My Connector : > ********************* > {code:java} > { > "name": "hdfs-sink-avro-cast-test-stndln", > "config": { > "key.converter": "io.confluent.connect.avro.AvroConverter", > "key.converter.schema.registry.url": "http://localhost:8081", > "value.converter": "io.confluent.connect.avro.AvroConverter", > "value.converter.schema.registry.url": "http://localhost:8081", > "key.converter.schemas.enable": "true", > "value.converter.schemas.enable": "true", > "internal.key.converter": "org.apache.kafka.connect.json.JsonConverter", > "internal.value.converter": "org.apache.kafka.connect.json.JsonConverter", > "internal.key.converter.schemas.enable": "false", > "internal.value.converter.schemas.enable": "false", > "offset.storage.file.filename": > "/tmp/connect.offsets.avroHdfsConsumer.casttest.stndln", > "offset.flush.interval.ms": "500", > "parse.key": "true", > "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector", > "hadoop.home": "/usr/lib/hadoop", > "hdfs.url": "hdfs://ip-127-34-56-789.us-east-1.compute.interna:8020", > "topics": "avro_raw_KFK_SRP_USER_TEST_V,avro_raw_KFK_SRP_PG_HITS_TEST_V", > "tasks.max": "1", > "topics.dir": "/home/hadoop/kafka/data/streams/in/raw/casttest1", > "logs.dir": "/home/hadoop/kafka/wal/streams/in/raw/casttest1", > "hive.integration": "true", > "hive.metastore.uris": > "thrift://ip-127-34-56-789.us-east-1.compute.internal:9083", > "schema.compatibility": "BACKWARD", > "flush.size": "10000", > "rotate.interval.ms": "1000", > "mode": "timestamp", > "transforms": "Cast", > "transforms.Cast.type": "org.apache.kafka.connect.transforms.Cast$Value", > "transforms.Cast.spec": "residuals:float64,comp:float64" > } > } > {code} > Exception : > ************* > {code:java} > [2017-11-16 01:14:39,719] ERROR Task hdfs-sink-avro-cast-test-stndln-0 threw > an uncaught and unrecoverable exception > (org.apache.kafka.connect.runtime.WorkerTask:148) > org.apache.kafka.connect.errors.DataException: Invalid Java object for schema > type INT64: class java.util.Date for field: "null" > at > org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:239) > at > org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:209) > at org.apache.kafka.connect.data.Struct.put(Struct.java:214) > at > org.apache.kafka.connect.transforms.Cast.applyWithSchema(Cast.java:152) > at org.apache.kafka.connect.transforms.Cast.apply(Cast.java:108) > at > org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:414) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:250) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:180) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148) > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146) > at > org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > [2017-11-16 01:14:39,719] ERROR Task is being killed and will not recover > until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:149) > [2017-11-16 01:14:39,719] INFO Shutting down Hive executor service. > (io.confluent.connect.hdfs.DataWriter:309) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)