[ 
https://issues.apache.org/jira/browse/KAFKA-6290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16933596#comment-16933596
 ] 

ASF GitHub Bot commented on KAFKA-6290:
---------------------------------------

ncliang commented on pull request #7371: KAFKA-6290: Support casting from 
logical types in cast transform
URL: https://github.com/apache/kafka/pull/7371
 
 
   This PR adds support for cast transforms to cast from logical types Date, 
Time, and Timestamp to
   their internal int32 or int64 representations. Any valid cast on their 
internal representations
   would also be applicable. For instance, Time has internal int32 
representation, but can be cast to
   int64 if desired.
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Kafka Connect cast transformation should support logical types
> --------------------------------------------------------------
>
>                 Key: KAFKA-6290
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6290
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 0.11.0.0
>            Reporter: Sudhir Pradhan
>            Priority: Major
>              Labels: confluent-kafka, confluentic, connect, connect-api, 
> connect-transformation, kafka, kafka-connect, transform
>
> I am facing same issue when consuming from KAFKA to HDFS with CAST 
> TRANSFORMS. Any pointer please.
> My Connector :
> *********************
> {code:java}
> {
>  "name": "hdfs-sink-avro-cast-test-stndln",
>  "config": {
>   "key.converter": "io.confluent.connect.avro.AvroConverter",
>   "key.converter.schema.registry.url": "http://localhost:8081";,
>   "value.converter": "io.confluent.connect.avro.AvroConverter",
>   "value.converter.schema.registry.url": "http://localhost:8081";,
>   "key.converter.schemas.enable": "true",
>   "value.converter.schemas.enable": "true",
>   "internal.key.converter": "org.apache.kafka.connect.json.JsonConverter",
>   "internal.value.converter": "org.apache.kafka.connect.json.JsonConverter",
>   "internal.key.converter.schemas.enable": "false",
>   "internal.value.converter.schemas.enable": "false",
>   "offset.storage.file.filename": 
> "/tmp/connect.offsets.avroHdfsConsumer.casttest.stndln",
>   "offset.flush.interval.ms": "500",
>   "parse.key": "true",
>   "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
>   "hadoop.home": "/usr/lib/hadoop",
>   "hdfs.url": "hdfs://ip-127-34-56-789.us-east-1.compute.interna:8020",
>   "topics": "avro_raw_KFK_SRP_USER_TEST_V,avro_raw_KFK_SRP_PG_HITS_TEST_V",
>   "tasks.max": "1",
>   "topics.dir": "/home/hadoop/kafka/data/streams/in/raw/casttest1",
>   "logs.dir": "/home/hadoop/kafka/wal/streams/in/raw/casttest1",
>   "hive.integration": "true",
>   "hive.metastore.uris": 
> "thrift://ip-127-34-56-789.us-east-1.compute.internal:9083",
>   "schema.compatibility": "BACKWARD",
>   "flush.size": "10000",
>   "rotate.interval.ms": "1000",
>   "mode": "timestamp",
>   "transforms": "Cast",
>   "transforms.Cast.type": "org.apache.kafka.connect.transforms.Cast$Value",
>   "transforms.Cast.spec": "residuals:float64,comp:float64"
>  }
> }
> {code}
> Exception :
> *************
> {code:java}
> [2017-11-16 01:14:39,719] ERROR Task hdfs-sink-avro-cast-test-stndln-0 threw 
> an uncaught and unrecoverable exception 
> (org.apache.kafka.connect.runtime.WorkerTask:148)
> org.apache.kafka.connect.errors.DataException: Invalid Java object for schema 
> type INT64: class java.util.Date for field: "null"
>         at 
> org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:239)
>         at 
> org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:209)
>         at org.apache.kafka.connect.data.Struct.put(Struct.java:214)
>         at 
> org.apache.kafka.connect.transforms.Cast.applyWithSchema(Cast.java:152)
>         at org.apache.kafka.connect.transforms.Cast.apply(Cast.java:108)
>         at 
> org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38)
>         at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:414)
>         at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:250)
>         at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:180)
>         at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
>         at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
>         at 
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> [2017-11-16 01:14:39,719] ERROR Task is being killed and will not recover 
> until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:149)
> [2017-11-16 01:14:39,719] INFO Shutting down Hive executor service. 
> (io.confluent.connect.hdfs.DataWriter:309)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to