This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 1.1 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.1 by this push: new c05ed1e KAFKA-7157: Fix handling of nulls in TimestampConverter (#7070) c05ed1e is described below commit c05ed1eae466ef8afd8d67022d206d7d9bb24838 Author: Robert Yokota <rayok...@gmail.com> AuthorDate: Fri Jul 12 10:12:20 2019 -0700 KAFKA-7157: Fix handling of nulls in TimestampConverter (#7070) Fix handling of nulls in TimestampConverter. Authors: Valeria Vasylieva <valeria.vasyli...@gmail.com>, Robert Yokota <rayok...@gmail.com> Reviewers: Arjun Satish <ar...@confluent.io>, Randall Hauch <rha...@gmail.com> --- .../connect/transforms/TimestampConverter.java | 52 +++-- .../connect/transforms/TimestampConverterTest.java | 233 +++++++++++++++++++-- 2 files changed, 243 insertions(+), 42 deletions(-) diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java index 8557441..f32253e 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java @@ -47,7 +47,7 @@ import java.util.Set; import java.util.TimeZone; import static org.apache.kafka.connect.transforms.util.Requirements.requireMap; -import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct; +import static org.apache.kafka.connect.transforms.util.Requirements.requireStructOrNull; public abstract class TimestampConverter<R extends ConnectRecord<R>> implements Transformation<R> { @@ -85,6 +85,10 @@ public abstract class TimestampConverter<R extends ConnectRecord<R>> implements private static final TimeZone UTC = TimeZone.getTimeZone("UTC"); + public static final Schema OPTIONAL_DATE_SCHEMA = org.apache.kafka.connect.data.Date.builder().optional().schema(); + public static final Schema OPTIONAL_TIMESTAMP_SCHEMA = Timestamp.builder().optional().schema(); + public static final Schema OPTIONAL_TIME_SCHEMA = Time.builder().optional().schema(); + private interface TimestampTranslator { /** * Convert from the type-specific format to the universal java.util.Date format @@ -94,7 +98,7 @@ public abstract class TimestampConverter<R extends ConnectRecord<R>> implements /** * Get the schema for this format. */ - Schema typeSchema(); + Schema typeSchema(boolean isOptional); /** * Convert from the universal java.util.Date format to the type-specific format @@ -118,8 +122,8 @@ public abstract class TimestampConverter<R extends ConnectRecord<R>> implements } @Override - public Schema typeSchema() { - return Schema.STRING_SCHEMA; + public Schema typeSchema(boolean isOptional) { + return isOptional ? Schema.OPTIONAL_STRING_SCHEMA : Schema.STRING_SCHEMA; } @Override @@ -139,8 +143,8 @@ public abstract class TimestampConverter<R extends ConnectRecord<R>> implements } @Override - public Schema typeSchema() { - return Schema.INT64_SCHEMA; + public Schema typeSchema(boolean isOptional) { + return isOptional ? Schema.OPTIONAL_INT64_SCHEMA : Schema.INT64_SCHEMA; } @Override @@ -159,8 +163,8 @@ public abstract class TimestampConverter<R extends ConnectRecord<R>> implements } @Override - public Schema typeSchema() { - return org.apache.kafka.connect.data.Date.SCHEMA; + public Schema typeSchema(boolean isOptional) { + return isOptional ? OPTIONAL_DATE_SCHEMA : org.apache.kafka.connect.data.Date.SCHEMA; } @Override @@ -185,8 +189,8 @@ public abstract class TimestampConverter<R extends ConnectRecord<R>> implements } @Override - public Schema typeSchema() { - return Time.SCHEMA; + public Schema typeSchema(boolean isOptional) { + return isOptional ? OPTIONAL_TIME_SCHEMA : Time.SCHEMA; } @Override @@ -212,8 +216,8 @@ public abstract class TimestampConverter<R extends ConnectRecord<R>> implements } @Override - public Schema typeSchema() { - return Timestamp.SCHEMA; + public Schema typeSchema(boolean isOptional) { + return isOptional ? OPTIONAL_TIMESTAMP_SCHEMA : Timestamp.SCHEMA; } @Override @@ -330,16 +334,16 @@ public abstract class TimestampConverter<R extends ConnectRecord<R>> implements if (config.field.isEmpty()) { Object value = operatingValue(record); // New schema is determined by the requested target timestamp type - Schema updatedSchema = TRANSLATORS.get(config.type).typeSchema(); + Schema updatedSchema = TRANSLATORS.get(config.type).typeSchema(schema.isOptional()); return newRecord(record, updatedSchema, convertTimestamp(value, timestampTypeFromSchema(schema))); } else { - final Struct value = requireStruct(operatingValue(record), PURPOSE); - Schema updatedSchema = schemaUpdateCache.get(value.schema()); + final Struct value = requireStructOrNull(operatingValue(record), PURPOSE); + Schema updatedSchema = schemaUpdateCache.get(schema); if (updatedSchema == null) { SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct()); for (Field field : schema.fields()) { if (field.name().equals(config.field)) { - builder.field(field.name(), TRANSLATORS.get(config.type).typeSchema()); + builder.field(field.name(), TRANSLATORS.get(config.type).typeSchema(field.schema().isOptional())); } else { builder.field(field.name(), field.schema()); } @@ -361,6 +365,9 @@ public abstract class TimestampConverter<R extends ConnectRecord<R>> implements } private Struct applyValueWithSchema(Struct value, Schema updatedSchema) { + if (value == null) { + return null; + } Struct updatedValue = new Struct(updatedSchema); for (Field field : value.schema().fields()) { final Object updatedFieldValue; @@ -375,11 +382,11 @@ public abstract class TimestampConverter<R extends ConnectRecord<R>> implements } private R applySchemaless(R record) { - if (config.field.isEmpty()) { - Object value = operatingValue(record); - return newRecord(record, null, convertTimestamp(value)); + Object rawValue = operatingValue(record); + if (rawValue == null || config.field.isEmpty()) { + return newRecord(record, null, convertTimestamp(rawValue)); } else { - final Map<String, Object> value = requireMap(operatingValue(record), PURPOSE); + final Map<String, Object> value = requireMap(rawValue, PURPOSE); final HashMap<String, Object> updatedValue = new HashMap<>(value); updatedValue.put(config.field, convertTimestamp(value.get(config.field))); return newRecord(record, null, updatedValue); @@ -424,11 +431,14 @@ public abstract class TimestampConverter<R extends ConnectRecord<R>> implements /** * Convert the given timestamp to the target timestamp format. - * @param timestamp the input timestamp + * @param timestamp the input timestamp, may be null * @param timestampFormat the format of the timestamp, or null if the format should be inferred * @return the converted timestamp */ private Object convertTimestamp(Object timestamp, String timestampFormat) { + if (timestamp == null) { + return null; + } if (timestampFormat == null) { timestampFormat = inferTimestampType(timestamp); } diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java index 475066f..3a1920e 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java @@ -35,6 +35,7 @@ import java.util.HashMap; import java.util.Map; import java.util.TimeZone; +import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; @@ -105,13 +106,12 @@ public class TimestampConverterTest { xformValue.configure(config); } - // Conversions without schemas (most flexible Timestamp -> other types) @Test public void testSchemalessIdentity() { xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp")); - SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime())); + SourceRecord transformed = xformValue.apply(createRecordSchemaless(DATE_PLUS_TIME.getTime())); assertNull(transformed.valueSchema()); assertEquals(DATE_PLUS_TIME.getTime(), transformed.value()); @@ -120,7 +120,7 @@ public class TimestampConverterTest { @Test public void testSchemalessTimestampToDate() { xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Date")); - SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime())); + SourceRecord transformed = xformValue.apply(createRecordSchemaless(DATE_PLUS_TIME.getTime())); assertNull(transformed.valueSchema()); assertEquals(DATE.getTime(), transformed.value()); @@ -129,7 +129,7 @@ public class TimestampConverterTest { @Test public void testSchemalessTimestampToTime() { xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Time")); - SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime())); + SourceRecord transformed = xformValue.apply(createRecordSchemaless(DATE_PLUS_TIME.getTime())); assertNull(transformed.valueSchema()); assertEquals(TIME.getTime(), transformed.value()); @@ -138,7 +138,7 @@ public class TimestampConverterTest { @Test public void testSchemalessTimestampToUnix() { xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "unix")); - SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime())); + SourceRecord transformed = xformValue.apply(createRecordSchemaless(DATE_PLUS_TIME.getTime())); assertNull(transformed.valueSchema()); assertEquals(DATE_PLUS_TIME_UNIX, transformed.value()); @@ -150,7 +150,7 @@ public class TimestampConverterTest { config.put(TimestampConverter.TARGET_TYPE_CONFIG, "string"); config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT); xformValue.configure(config); - SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime())); + SourceRecord transformed = xformValue.apply(createRecordSchemaless(DATE_PLUS_TIME.getTime())); assertNull(transformed.valueSchema()); assertEquals(DATE_PLUS_TIME_STRING, transformed.value()); @@ -162,7 +162,7 @@ public class TimestampConverterTest { @Test public void testSchemalessDateToTimestamp() { xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp")); - SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, DATE.getTime())); + SourceRecord transformed = xformValue.apply(createRecordSchemaless(DATE.getTime())); assertNull(transformed.valueSchema()); // No change expected since the source type is coarser-grained @@ -172,7 +172,7 @@ public class TimestampConverterTest { @Test public void testSchemalessTimeToTimestamp() { xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp")); - SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, TIME.getTime())); + SourceRecord transformed = xformValue.apply(createRecordSchemaless(TIME.getTime())); assertNull(transformed.valueSchema()); // No change expected since the source type is coarser-grained @@ -182,7 +182,7 @@ public class TimestampConverterTest { @Test public void testSchemalessUnixToTimestamp() { xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp")); - SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME_UNIX)); + SourceRecord transformed = xformValue.apply(createRecordSchemaless(DATE_PLUS_TIME_UNIX)); assertNull(transformed.valueSchema()); assertEquals(DATE_PLUS_TIME.getTime(), transformed.value()); @@ -194,7 +194,7 @@ public class TimestampConverterTest { config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT); xformValue.configure(config); - SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME_STRING)); + SourceRecord transformed = xformValue.apply(createRecordSchemaless(DATE_PLUS_TIME_STRING)); assertNull(transformed.valueSchema()); assertEquals(DATE_PLUS_TIME.getTime(), transformed.value()); @@ -206,7 +206,7 @@ public class TimestampConverterTest { @Test public void testWithSchemaIdentity() { xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp")); - SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime())); + SourceRecord transformed = xformValue.apply(createRecordWithSchema(Timestamp.SCHEMA, DATE_PLUS_TIME.getTime())); assertEquals(Timestamp.SCHEMA, transformed.valueSchema()); assertEquals(DATE_PLUS_TIME.getTime(), transformed.value()); @@ -215,7 +215,7 @@ public class TimestampConverterTest { @Test public void testWithSchemaTimestampToDate() { xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Date")); - SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime())); + SourceRecord transformed = xformValue.apply(createRecordWithSchema(Timestamp.SCHEMA, DATE_PLUS_TIME.getTime())); assertEquals(Date.SCHEMA, transformed.valueSchema()); assertEquals(DATE.getTime(), transformed.value()); @@ -224,7 +224,7 @@ public class TimestampConverterTest { @Test public void testWithSchemaTimestampToTime() { xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Time")); - SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime())); + SourceRecord transformed = xformValue.apply(createRecordWithSchema(Timestamp.SCHEMA, DATE_PLUS_TIME.getTime())); assertEquals(Time.SCHEMA, transformed.valueSchema()); assertEquals(TIME.getTime(), transformed.value()); @@ -233,7 +233,7 @@ public class TimestampConverterTest { @Test public void testWithSchemaTimestampToUnix() { xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "unix")); - SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime())); + SourceRecord transformed = xformValue.apply(createRecordWithSchema(Timestamp.SCHEMA, DATE_PLUS_TIME.getTime())); assertEquals(Schema.INT64_SCHEMA, transformed.valueSchema()); assertEquals(DATE_PLUS_TIME_UNIX, transformed.value()); @@ -245,19 +245,70 @@ public class TimestampConverterTest { config.put(TimestampConverter.TARGET_TYPE_CONFIG, "string"); config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT); xformValue.configure(config); - SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime())); + SourceRecord transformed = xformValue.apply(createRecordWithSchema(Timestamp.SCHEMA, DATE_PLUS_TIME.getTime())); assertEquals(Schema.STRING_SCHEMA, transformed.valueSchema()); assertEquals(DATE_PLUS_TIME_STRING, transformed.value()); } + // Null-value conversions schemaless + + @Test + public void testSchemalessNullValueToString() { + testSchemalessNullValueConversion("string"); + testSchemalessNullFieldConversion("string"); + } + @Test + public void testSchemalessNullValueToDate() { + testSchemalessNullValueConversion("Date"); + testSchemalessNullFieldConversion("Date"); + } + @Test + public void testSchemalessNullValueToTimestamp() { + testSchemalessNullValueConversion("Timestamp"); + testSchemalessNullFieldConversion("Timestamp"); + } + @Test + public void testSchemalessNullValueToUnix() { + testSchemalessNullValueConversion("unix"); + testSchemalessNullFieldConversion("unix"); + } + + @Test + public void testSchemalessNullValueToTime() { + testSchemalessNullValueConversion("Time"); + testSchemalessNullFieldConversion("Time"); + } + + private void testSchemalessNullValueConversion(String targetType) { + Map<String, String> config = new HashMap<>(); + config.put(TimestampConverter.TARGET_TYPE_CONFIG, targetType); + config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT); + xformValue.configure(config); + SourceRecord transformed = xformValue.apply(createRecordSchemaless(null)); + + assertNull(transformed.valueSchema()); + assertNull(transformed.value()); + } + + private void testSchemalessNullFieldConversion(String targetType) { + Map<String, String> config = new HashMap<>(); + config.put(TimestampConverter.TARGET_TYPE_CONFIG, targetType); + config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT); + config.put(TimestampConverter.FIELD_CONFIG, "ts"); + xformValue.configure(config); + SourceRecord transformed = xformValue.apply(createRecordSchemaless(null)); + + assertNull(transformed.valueSchema()); + assertNull(transformed.value()); + } // Conversions with schemas (core types -> most flexible Timestamp format) @Test public void testWithSchemaDateToTimestamp() { xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp")); - SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, Date.SCHEMA, DATE.getTime())); + SourceRecord transformed = xformValue.apply(createRecordWithSchema(Date.SCHEMA, DATE.getTime())); assertEquals(Timestamp.SCHEMA, transformed.valueSchema()); // No change expected since the source type is coarser-grained @@ -267,7 +318,7 @@ public class TimestampConverterTest { @Test public void testWithSchemaTimeToTimestamp() { xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp")); - SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, Time.SCHEMA, TIME.getTime())); + SourceRecord transformed = xformValue.apply(createRecordWithSchema(Time.SCHEMA, TIME.getTime())); assertEquals(Timestamp.SCHEMA, transformed.valueSchema()); // No change expected since the source type is coarser-grained @@ -277,7 +328,7 @@ public class TimestampConverterTest { @Test public void testWithSchemaUnixToTimestamp() { xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp")); - SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, Schema.INT64_SCHEMA, DATE_PLUS_TIME_UNIX)); + SourceRecord transformed = xformValue.apply(createRecordWithSchema(Schema.INT64_SCHEMA, DATE_PLUS_TIME_UNIX)); assertEquals(Timestamp.SCHEMA, transformed.valueSchema()); assertEquals(DATE_PLUS_TIME.getTime(), transformed.value()); @@ -289,12 +340,145 @@ public class TimestampConverterTest { config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT); xformValue.configure(config); - SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, Schema.STRING_SCHEMA, DATE_PLUS_TIME_STRING)); + SourceRecord transformed = xformValue.apply(createRecordWithSchema(Schema.STRING_SCHEMA, DATE_PLUS_TIME_STRING)); assertEquals(Timestamp.SCHEMA, transformed.valueSchema()); assertEquals(DATE_PLUS_TIME.getTime(), transformed.value()); } + // Null-value conversions with schema + + @Test + public void testWithSchemaNullValueToTimestamp() { + testWithSchemaNullValueConversion("Timestamp", Schema.OPTIONAL_INT64_SCHEMA, TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA); + testWithSchemaNullValueConversion("Timestamp", TimestampConverter.OPTIONAL_TIME_SCHEMA, TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA); + testWithSchemaNullValueConversion("Timestamp", TimestampConverter.OPTIONAL_DATE_SCHEMA, TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA); + testWithSchemaNullValueConversion("Timestamp", Schema.OPTIONAL_STRING_SCHEMA, TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA); + testWithSchemaNullValueConversion("Timestamp", TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA, TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA); + } + + @Test + public void testWithSchemaNullFieldToTimestamp() { + testWithSchemaNullFieldConversion("Timestamp", Schema.OPTIONAL_INT64_SCHEMA, TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA); + testWithSchemaNullFieldConversion("Timestamp", TimestampConverter.OPTIONAL_TIME_SCHEMA, TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA); + testWithSchemaNullFieldConversion("Timestamp", TimestampConverter.OPTIONAL_DATE_SCHEMA, TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA); + testWithSchemaNullFieldConversion("Timestamp", Schema.OPTIONAL_STRING_SCHEMA, TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA); + testWithSchemaNullFieldConversion("Timestamp", TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA, TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA); + } + + @Test + public void testWithSchemaNullValueToUnix() { + testWithSchemaNullValueConversion("unix", Schema.OPTIONAL_INT64_SCHEMA, Schema.OPTIONAL_INT64_SCHEMA); + testWithSchemaNullValueConversion("unix", TimestampConverter.OPTIONAL_TIME_SCHEMA, Schema.OPTIONAL_INT64_SCHEMA); + testWithSchemaNullValueConversion("unix", TimestampConverter.OPTIONAL_DATE_SCHEMA, Schema.OPTIONAL_INT64_SCHEMA); + testWithSchemaNullValueConversion("unix", Schema.OPTIONAL_STRING_SCHEMA, Schema.OPTIONAL_INT64_SCHEMA); + testWithSchemaNullValueConversion("unix", TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA, Schema.OPTIONAL_INT64_SCHEMA); + } + + @Test + public void testWithSchemaNullFieldToUnix() { + testWithSchemaNullFieldConversion("unix", Schema.OPTIONAL_INT64_SCHEMA, Schema.OPTIONAL_INT64_SCHEMA); + testWithSchemaNullFieldConversion("unix", TimestampConverter.OPTIONAL_TIME_SCHEMA, Schema.OPTIONAL_INT64_SCHEMA); + testWithSchemaNullFieldConversion("unix", TimestampConverter.OPTIONAL_DATE_SCHEMA, Schema.OPTIONAL_INT64_SCHEMA); + testWithSchemaNullFieldConversion("unix", Schema.OPTIONAL_STRING_SCHEMA, Schema.OPTIONAL_INT64_SCHEMA); + testWithSchemaNullFieldConversion("unix", TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA, Schema.OPTIONAL_INT64_SCHEMA); + } + + @Test + public void testWithSchemaNullValueToTime() { + testWithSchemaNullValueConversion("Time", Schema.OPTIONAL_INT64_SCHEMA, TimestampConverter.OPTIONAL_TIME_SCHEMA); + testWithSchemaNullValueConversion("Time", TimestampConverter.OPTIONAL_TIME_SCHEMA, TimestampConverter.OPTIONAL_TIME_SCHEMA); + testWithSchemaNullValueConversion("Time", TimestampConverter.OPTIONAL_DATE_SCHEMA, TimestampConverter.OPTIONAL_TIME_SCHEMA); + testWithSchemaNullValueConversion("Time", Schema.OPTIONAL_STRING_SCHEMA, TimestampConverter.OPTIONAL_TIME_SCHEMA); + testWithSchemaNullValueConversion("Time", TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA, TimestampConverter.OPTIONAL_TIME_SCHEMA); + } + + @Test + public void testWithSchemaNullFieldToTime() { + testWithSchemaNullFieldConversion("Time", Schema.OPTIONAL_INT64_SCHEMA, TimestampConverter.OPTIONAL_TIME_SCHEMA); + testWithSchemaNullFieldConversion("Time", TimestampConverter.OPTIONAL_TIME_SCHEMA, TimestampConverter.OPTIONAL_TIME_SCHEMA); + testWithSchemaNullFieldConversion("Time", TimestampConverter.OPTIONAL_DATE_SCHEMA, TimestampConverter.OPTIONAL_TIME_SCHEMA); + testWithSchemaNullFieldConversion("Time", Schema.OPTIONAL_STRING_SCHEMA, TimestampConverter.OPTIONAL_TIME_SCHEMA); + testWithSchemaNullFieldConversion("Time", TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA, TimestampConverter.OPTIONAL_TIME_SCHEMA); + } + + @Test + public void testWithSchemaNullValueToDate() { + testWithSchemaNullValueConversion("Date", Schema.OPTIONAL_INT64_SCHEMA, TimestampConverter.OPTIONAL_DATE_SCHEMA); + testWithSchemaNullValueConversion("Date", TimestampConverter.OPTIONAL_TIME_SCHEMA, TimestampConverter.OPTIONAL_DATE_SCHEMA); + testWithSchemaNullValueConversion("Date", TimestampConverter.OPTIONAL_DATE_SCHEMA, TimestampConverter.OPTIONAL_DATE_SCHEMA); + testWithSchemaNullValueConversion("Date", Schema.OPTIONAL_STRING_SCHEMA, TimestampConverter.OPTIONAL_DATE_SCHEMA); + testWithSchemaNullValueConversion("Date", TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA, TimestampConverter.OPTIONAL_DATE_SCHEMA); + } + + @Test + public void testWithSchemaNullFieldToDate() { + testWithSchemaNullFieldConversion("Date", Schema.OPTIONAL_INT64_SCHEMA, TimestampConverter.OPTIONAL_DATE_SCHEMA); + testWithSchemaNullFieldConversion("Date", TimestampConverter.OPTIONAL_TIME_SCHEMA, TimestampConverter.OPTIONAL_DATE_SCHEMA); + testWithSchemaNullFieldConversion("Date", TimestampConverter.OPTIONAL_DATE_SCHEMA, TimestampConverter.OPTIONAL_DATE_SCHEMA); + testWithSchemaNullFieldConversion("Date", Schema.OPTIONAL_STRING_SCHEMA, TimestampConverter.OPTIONAL_DATE_SCHEMA); + testWithSchemaNullFieldConversion("Date", TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA, TimestampConverter.OPTIONAL_DATE_SCHEMA); + } + + @Test + public void testWithSchemaNullValueToString() { + testWithSchemaNullValueConversion("string", Schema.OPTIONAL_INT64_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA); + testWithSchemaNullValueConversion("string", TimestampConverter.OPTIONAL_TIME_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA); + testWithSchemaNullValueConversion("string", TimestampConverter.OPTIONAL_DATE_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA); + testWithSchemaNullValueConversion("string", Schema.OPTIONAL_STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA); + testWithSchemaNullValueConversion("string", TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA); + } + + @Test + public void testWithSchemaNullFieldToString() { + testWithSchemaNullFieldConversion("string", Schema.OPTIONAL_INT64_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA); + testWithSchemaNullFieldConversion("string", TimestampConverter.OPTIONAL_TIME_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA); + testWithSchemaNullFieldConversion("string", TimestampConverter.OPTIONAL_DATE_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA); + testWithSchemaNullFieldConversion("string", Schema.OPTIONAL_STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA); + testWithSchemaNullFieldConversion("string", TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA); + } + + private void testWithSchemaNullValueConversion(String targetType, Schema originalSchema, Schema expectedSchema) { + Map<String, String> config = new HashMap<>(); + config.put(TimestampConverter.TARGET_TYPE_CONFIG, targetType); + config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT); + xformValue.configure(config); + SourceRecord transformed = xformValue.apply(createRecordWithSchema(originalSchema, null)); + + assertEquals(expectedSchema, transformed.valueSchema()); + assertNull(transformed.value()); + } + + private void testWithSchemaNullFieldConversion(String targetType, Schema originalSchema, Schema expectedSchema) { + Map<String, String> config = new HashMap<>(); + config.put(TimestampConverter.TARGET_TYPE_CONFIG, targetType); + config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT); + config.put(TimestampConverter.FIELD_CONFIG, "ts"); + xformValue.configure(config); + SchemaBuilder structSchema = SchemaBuilder.struct() + .field("ts", originalSchema) + .field("other", Schema.STRING_SCHEMA); + + SchemaBuilder expectedStructSchema = SchemaBuilder.struct() + .field("ts", expectedSchema) + .field("other", Schema.STRING_SCHEMA); + + Struct original = new Struct(structSchema); + original.put("ts", null); + original.put("other", "test"); + + // Struct field is null + SourceRecord transformed = xformValue.apply(createRecordWithSchema(structSchema.build(), original)); + + assertEquals(expectedStructSchema.build(), transformed.valueSchema()); + assertNull(requireStruct(transformed.value(), "").get("ts")); + + // entire Struct is null + transformed = xformValue.apply(createRecordWithSchema(structSchema.optional().build(), null)); + + assertEquals(expectedStructSchema.optional().build(), transformed.valueSchema()); + assertNull(transformed.value()); + } // Convert field instead of entire key/value @@ -306,7 +490,7 @@ public class TimestampConverterTest { xformValue.configure(config); Object value = Collections.singletonMap("ts", DATE_PLUS_TIME.getTime()); - SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, value)); + SourceRecord transformed = xformValue.apply(createRecordSchemaless(value)); assertNull(transformed.valueSchema()); assertEquals(Collections.singletonMap("ts", DATE.getTime()), transformed.value()); @@ -328,7 +512,7 @@ public class TimestampConverterTest { original.put("ts", DATE_PLUS_TIME_UNIX); original.put("other", "test"); - SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, structWithTimestampFieldSchema, original)); + SourceRecord transformed = xformValue.apply(createRecordWithSchema(structWithTimestampFieldSchema, original)); Schema expectedSchema = SchemaBuilder.struct() .field("ts", Timestamp.SCHEMA) @@ -351,4 +535,11 @@ public class TimestampConverterTest { assertEquals(DATE_PLUS_TIME.getTime(), transformed.key()); } + private SourceRecord createRecordWithSchema(Schema schema, Object value) { + return new SourceRecord(null, null, "topic", 0, schema, value); + } + + private SourceRecord createRecordSchemaless(Object value) { + return createRecordWithSchema(null, value); + } }