Hi, there! I have encountered the problem, that Kafka Connect's Cast transformation loses schema information (basically, schema name) while doing type casting. I have reproduced this problem with the following test in org.apache.kafka.connect.transforms.CastTest for current trunk repository branch: ``` @SuppressWarnings("unchecked") @Test public void castWholeRecordValueWithSchemaBooleanAndTimestampField() { final Cast<SourceRecord> xform = new Cast.Value<>(); xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int64:boolean"));
SchemaBuilder builder = SchemaBuilder.struct(); builder.field("int64", Schema.INT64_SCHEMA); builder.field("timestamp", Timestamp.SCHEMA); Schema supportedTypesSchema = builder.build(); Struct recordValue = new Struct(supportedTypesSchema); recordValue.put("int64", (long) 64); recordValue.put("timestamp", new java.sql.Timestamp(0L)); SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, supportedTypesSchema, recordValue)); assertNull(transformed.valueSchema()); assertEquals(true, ((Map<String, Object>) transformed.value()).get("int64")); assertEquals(new java.sql.Timestamp(0L), ((Map<String, Object>) transformed.value()).get("timestamp")); } ``` And this fails with the following exception: ``` org.apache.kafka.connect.errors.DataException: Invalid Java object for schema type INT64: class java.sql.Timestamp for field: "null" at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:240) at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:209) at org.apache.kafka.connect.data.Struct.put(Struct.java:214) at org.apache.kafka.connect.transforms.Cast.applyWithSchema(Cast.java:152) at org.apache.kafka.connect.transforms.Cast.apply(Cast.java:108) at org.apache.kafka.connect.transforms.CastTest.castWholeRecordValueWithSchemaBooleanAndTimestampField(CastTest.java:380) ``` This happens because Timestamp.SCHEMA has schema.type = 'INT64' and schema.name = "org.apache.kafka.connect.data.Timestamp", but org.apache.kafka.connect.transforms.Cast#getOrBuildSchema method copies only schema.type and rewrites schema.name with 'null'. For example, such a behavior leads to connector failure while exporting data from a database having timestamp field and additional field we perform type casting on (I have a connector settings for PostgreSQL which leads to this problem). Should I report a bug or there is something I misunderstand? I have patched a source code locally in order to fix this problem for my particular case and I am ready to prepare a more general path if it will be necessary. I have also attached a git patch file with the failing test case. Thanks, Artem