This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 3411251  NIFI-6640 - UNION/CHOICE types not handled correctly 3 
important changes: 1. FieldTypeInference had a bug when dealing with multiple 
datatypes for  the same field where some (but not all) were in a 
wider-than-the-other  relationship.  Before: Some datatypes could be lost. 
String was wider than any other.  After: Consistent behaviour. String is NOT 
wider than any other. 2. Choosing a datatype for a value from a ChoiceDataType: 
 Before it chose the first compatible datatyp [...]
3411251 is described below

commit 34112519c2dde19d704ef624e62e51b399cf1ce7
Author: Tamas Palfy <[email protected]>
AuthorDate: Fri Sep 13 16:39:24 2019 +0200

    NIFI-6640 - UNION/CHOICE types not handled correctly
    3 important changes:
    1. FieldTypeInference had a bug when dealing with multiple datatypes for
     the same field where some (but not all) were in a wider-than-the-other
     relationship.
     Before: Some datatypes could be lost. String was wider than any other.
     After: Consistent behaviour. String is NOT wider than any other.
    2. Choosing a datatype for a value from a ChoiceDataType:
     Before it chose the first compatible datatype as the basis of conversion.
     After change it tries to find the most suitable datatype.
    3. Conversion of a value of avro union type:
     Before it chose the first compatible datatype as the basis of conversion.
     After change it tries to find the most suitable datatype.
    
    Change: In the RecordFieldType enum moved TIMESTAMP ahead of DATE.
    
    This closes #3724.
    
    Signed-off-by: Mark Payne <[email protected]>
---
 .../nifi/serialization/record/RecordFieldType.java |  10 +-
 .../serialization/record/util/DataTypeUtils.java   |  91 ++++-
 .../serialization/record/TestDataTypeUtils.java    | 224 ++++++++++++
 .../java/org/apache/nifi/avro/AvroTypeUtil.java    |  11 +
 .../org/apache/nifi/avro/TestAvroTypeUtil.java     |  53 +++
 .../nifi-standard-processors/pom.xml               |   8 +
 .../processors/standard/AbstractConversionIT.java  | 388 +++++++++++++++++++++
 .../standard/ConversionWithExplicitSchemaIT.java   |  88 +++++
 .../standard/ConversionWithSchemaInferenceIT.java  |  51 +++
 .../TestConversions/data.int_float_string.json     |  22 ++
 .../data.int_float_string.with_header.csv          |   8 +
 .../data.int_float_string.with_schema.avro         | Bin 0 -> 302 bytes
 .../data.int_float_string.with_schema.json.to.avro | Bin 0 -> 322 bytes
 .../data.int_float_string.without_header.csv       |   7 +
 .../data.int_float_string.without_schema.avro      | Bin 0 -> 51 bytes
 .../TestConversions/data.int_float_string.xml      |  31 ++
 .../resources/TestConversions/explicit.schema.json |  23 ++
 .../nifi/schema/inference/FieldTypeInference.java  |  35 +-
 .../apache/nifi/csv/TestCSVSchemaInference.java    |  16 +-
 .../org/apache/nifi/csv/TestWriteCSVResult.java    |   2 +-
 .../json/TestInferJsonSchemaAccessStrategy.java    |  12 +-
 .../schema/inference/TestFieldTypeInference.java   | 182 ++++++++++
 .../org/apache/nifi/xml/TestInferXmlSchema.java    |   8 +-
 .../src/test/resources/json/output/dataTypes.json  |   2 +-
 24 files changed, 1233 insertions(+), 39 deletions(-)

diff --git 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java
 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java
index de9aa58..413c128 100644
--- 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java
+++ 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java
@@ -73,6 +73,11 @@ public enum RecordFieldType {
     DOUBLE("double", FLOAT),
 
     /**
+     * A timestamp field type. Fields of this type use a {@code 
java.sql.Timestamp} value.
+     */
+    TIMESTAMP("timestamp", "yyyy-MM-dd HH:mm:ss"),
+
+    /**
      * A date field type. Fields of this type use a {@code java.sql.Date} 
value.
      */
     DATE("date", "yyyy-MM-dd"),
@@ -83,11 +88,6 @@ public enum RecordFieldType {
     TIME("time", "HH:mm:ss"),
 
     /**
-     * A timestamp field type. Fields of this type use a {@code 
java.sql.Timestamp} value.
-     */
-    TIMESTAMP("timestamp", "yyyy-MM-dd HH:mm:ss"),
-
-    /**
      * A char field type. Fields of this type use a {@code char} value.
      */
     CHAR("char"),
diff --git 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
index 0686dcf..308cafa 100644
--- 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
+++ 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.InputStream;
 import java.io.Reader;
+import java.lang.reflect.Array;
 import java.math.BigInteger;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
@@ -47,16 +48,21 @@ import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Queue;
 import java.util.Set;
 import java.util.TimeZone;
+import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.regex.Pattern;
@@ -225,17 +231,75 @@ public class DataTypeUtils {
     }
 
     public static DataType chooseDataType(final Object value, final 
ChoiceDataType choiceType) {
-        for (final DataType subType : choiceType.getPossibleSubTypes()) {
-            if (isCompatibleDataType(value, subType)) {
-                if (subType.getFieldType() == RecordFieldType.CHOICE) {
-                    return chooseDataType(value, (ChoiceDataType) subType);
+        Queue<DataType> possibleSubTypes = new 
LinkedList<>(choiceType.getPossibleSubTypes());
+        List<DataType> compatibleSimpleSubTypes = new ArrayList<>();
+
+        DataType subType;
+        while ((subType = possibleSubTypes.poll()) != null) {
+            if (subType instanceof ChoiceDataType) {
+                possibleSubTypes.addAll(((ChoiceDataType) 
subType).getPossibleSubTypes());
+            } else {
+                if (isCompatibleDataType(value, subType)) {
+                    compatibleSimpleSubTypes.add(subType);
+                }
+            }
+        }
+
+        int nrOfCompatibleSimpleSubTypes = compatibleSimpleSubTypes.size();
+
+        final DataType chosenSimpleType;
+        if (nrOfCompatibleSimpleSubTypes == 0) {
+            chosenSimpleType = null;
+        } else if (nrOfCompatibleSimpleSubTypes == 1) {
+            chosenSimpleType = compatibleSimpleSubTypes.get(0);
+        } else {
+            chosenSimpleType = findMostSuitableType(value, 
compatibleSimpleSubTypes, Function.identity())
+                    .orElse(compatibleSimpleSubTypes.get(0));
+        }
+
+        return chosenSimpleType;
+    }
+
+    public static <T> Optional<T> findMostSuitableType(Object value, List<T> 
types, Function<T, DataType> dataTypeMapper) {
+        if (value instanceof String) {
+            return findMostSuitableTypeByStringValue((String) value, types, 
dataTypeMapper);
+        } else {
+            DataType inferredDataType = inferDataType(value, null);
+
+            if (inferredDataType != null && 
!inferredDataType.getFieldType().equals(RecordFieldType.STRING)) {
+                for (T type : types) {
+                    if (inferredDataType.equals(dataTypeMapper.apply(type))) {
+                        return Optional.of(type);
+                    }
                 }
 
-                return subType;
+                for (T type : types) {
+                    if (getWiderType(dataTypeMapper.apply(type), 
inferredDataType).isPresent()) {
+                        return Optional.of(type);
+                    }
+                }
             }
         }
 
-        return null;
+        return Optional.empty();
+    }
+
+    public static <T> Optional<T> findMostSuitableTypeByStringValue(String 
valueAsString, List<T> types, Function<T, DataType> dataTypeMapper) {
+        // Sorting based on the RecordFieldType enum ordering looks 
appropriate here as we want simpler types
+        //  first and the enum's ordering seems to reflect that
+        Collections.sort(types, Comparator.comparing(type -> 
dataTypeMapper.apply(type).getFieldType()));
+
+        for (T type : types) {
+            try {
+                if (isCompatibleDataType(valueAsString, 
dataTypeMapper.apply(type))) {
+                    return Optional.of(type);
+                }
+            } catch (Exception e) {
+                logger.error("Exception thrown while checking if '" + 
valueAsString + "' is compatible with '" + type + "'", e);
+            }
+        }
+
+        return Optional.empty();
     }
 
     public static Record toRecord(final Object value, final RecordSchema 
recordSchema, final String fieldName) {
@@ -440,12 +504,12 @@ public class DataTypeUtils {
 //            final DataType elementDataType = inferDataType(valueFromMap, 
RecordFieldType.STRING.getDataType());
 //            return RecordFieldType.MAP.getMapDataType(elementDataType);
         }
-        if (value instanceof Object[]) {
-            final Object[] array = (Object[]) value;
-
+        if (value.getClass().isArray()) {
             DataType mergedDataType = null;
-            for (final Object arrayValue : array) {
-                final DataType inferredDataType = inferDataType(arrayValue, 
RecordFieldType.STRING.getDataType());
+
+            int length = Array.getLength(value);
+            for(int index = 0; index < length; index++) {
+                final DataType inferredDataType = 
inferDataType(Array.get(value, index), RecordFieldType.STRING.getDataType());
                 mergedDataType = mergeDataTypes(mergedDataType, 
inferredDataType);
             }
 
@@ -1545,7 +1609,10 @@ public class DataTypeUtils {
                 possibleTypes.add(otherDataType);
             }
 
-            return RecordFieldType.CHOICE.getChoiceDataType(new 
ArrayList<>(possibleTypes));
+            ArrayList<DataType> possibleChildTypes = new 
ArrayList<>(possibleTypes);
+            Collections.sort(possibleChildTypes, 
Comparator.comparing(DataType::getFieldType));
+
+            return 
RecordFieldType.CHOICE.getChoiceDataType(possibleChildTypes);
         }
     }
 
diff --git 
a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java
 
b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java
index 89a0490..30b2a60 100644
--- 
a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java
+++ 
b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java
@@ -18,6 +18,7 @@
 package org.apache.nifi.serialization.record;
 
 import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.type.ChoiceDataType;
 import org.apache.nifi.serialization.record.util.DataTypeUtils;
 import 
org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
 import org.junit.Test;
@@ -27,10 +28,17 @@ import java.nio.charset.StandardCharsets;
 import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -324,4 +332,220 @@ public class TestDataTypeUtils {
         }
         assertNotNull(e);
     }
+
+    @Test
+    public void 
testFindMostSuitableTypeByStringValueShouldReturnEvenWhenOneTypeThrowsException()
 {
+        String valueAsString = "value";
+
+        String nonMatchingType = "nonMatchingType";
+        String throwsExceptionType = "throwsExceptionType";
+        String matchingType = "matchingType";
+
+        List<String> types = Arrays.asList(
+                nonMatchingType,
+                throwsExceptionType,
+                matchingType
+        );
+        Optional<String> expected = Optional.of(matchingType);
+
+        AtomicBoolean exceptionThrown = new AtomicBoolean(false);
+
+        Function<String, DataType> dataTypeMapper = type -> {
+            if (type.equals(nonMatchingType)) {
+                return RecordFieldType.BOOLEAN.getDataType();
+            } else if (type.equals(throwsExceptionType)) {
+                return new DataType(RecordFieldType.DATE, null) {
+                    @Override
+                    public String getFormat() {
+                        exceptionThrown.set(true);
+                        throw new RuntimeException("maching error");
+                    }
+                };
+            } else if (type.equals(matchingType)) {
+                return RecordFieldType.STRING.getDataType();
+            }
+
+            return null;
+        };
+
+        Optional<String> actual = 
DataTypeUtils.findMostSuitableTypeByStringValue(valueAsString, types, 
dataTypeMapper);
+        assertTrue("Exception not thrown during test as intended.", 
exceptionThrown.get());
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testChooseDataTypeWhenInt_vs_INT_FLOAT_ThenShouldReturnINT() {
+        // GIVEN
+        List<DataType> dataTypes = Arrays.asList(
+                RecordFieldType.INT.getDataType(),
+                RecordFieldType.FLOAT.getDataType()
+        );
+
+        Object value = 1;
+        DataType expected = RecordFieldType.INT.getDataType();
+
+        // WHEN
+        // THEN
+        testChooseDataTypeAlsoReverseTypes(value, dataTypes, expected);
+    }
+
+    @Test
+    public void 
testChooseDataTypeWhenFloat_vs_INT_FLOAT_ThenShouldReturnFLOAT() {
+        // GIVEN
+        List<DataType> dataTypes = Arrays.asList(
+                RecordFieldType.INT.getDataType(),
+                RecordFieldType.FLOAT.getDataType()
+        );
+
+        Object value = 1.5f;
+        DataType expected = RecordFieldType.FLOAT.getDataType();
+
+        // WHEN
+        // THEN
+        testChooseDataTypeAlsoReverseTypes(value, dataTypes, expected);
+    }
+
+    @Test
+    public void 
testChooseDataTypeWhenHasChoiceThenShouldReturnSingleMatchingFromChoice() {
+        // GIVEN
+        List<DataType> dataTypes = Arrays.asList(
+                RecordFieldType.INT.getDataType(),
+                RecordFieldType.DOUBLE.getDataType(),
+                RecordFieldType.CHOICE.getChoiceDataType(
+                        RecordFieldType.FLOAT.getDataType(),
+                        RecordFieldType.STRING.getDataType()
+                )
+        );
+
+        Object value = 1.5f;
+        DataType expected = RecordFieldType.FLOAT.getDataType();
+
+        // WHEN
+        // THEN
+        testChooseDataTypeAlsoReverseTypes(value, dataTypes, expected);
+    }
+
+    private <E> void testChooseDataTypeAlsoReverseTypes(Object value, 
List<DataType> dataTypes, DataType expected) {
+        testChooseDataType(dataTypes, value, expected);
+        Collections.reverse(dataTypes);
+        testChooseDataType(dataTypes, value, expected);
+    }
+
+    private void testChooseDataType(List<DataType> dataTypes, Object value, 
DataType expected) {
+        // GIVEN
+        ChoiceDataType choiceDataType = (ChoiceDataType) 
RecordFieldType.CHOICE.getChoiceDataType(dataTypes.toArray(new 
DataType[dataTypes.size()]));
+
+        // WHEN
+        DataType actual = DataTypeUtils.chooseDataType(value, choiceDataType);
+
+        // THEN
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testFindMostSuitableTypeWithBoolean() {
+        testFindMostSuitableType(true, RecordFieldType.BOOLEAN.getDataType());
+    }
+
+    @Test
+    public void testFindMostSuitableTypeWithByte() {
+        testFindMostSuitableType(Byte.valueOf((byte) 123), 
RecordFieldType.BYTE.getDataType());
+    }
+
+    @Test
+    public void testFindMostSuitableTypeWithShort() {
+        testFindMostSuitableType(Short.valueOf((short) 123), 
RecordFieldType.SHORT.getDataType());
+    }
+
+    @Test
+    public void testFindMostSuitableTypeWithInt() {
+        testFindMostSuitableType(123, RecordFieldType.INT.getDataType());
+    }
+
+    @Test
+    public void testFindMostSuitableTypeWithLong() {
+        testFindMostSuitableType(123L, RecordFieldType.LONG.getDataType());
+    }
+
+    @Test
+    public void testFindMostSuitableTypeWithBigInt() {
+        testFindMostSuitableType(BigInteger.valueOf(123L), 
RecordFieldType.BIGINT.getDataType());
+    }
+
+    @Test
+    public void testFindMostSuitableTypeWithFloat() {
+        testFindMostSuitableType(12.3F, RecordFieldType.FLOAT.getDataType());
+    }
+
+    @Test
+    public void testFindMostSuitableTypeWithDouble() {
+        testFindMostSuitableType(12.3, RecordFieldType.DOUBLE.getDataType());
+    }
+
+    @Test
+    public void testFindMostSuitableTypeWithDate() {
+        testFindMostSuitableType("1111-11-11", 
RecordFieldType.DATE.getDataType());
+    }
+
+    @Test
+    public void testFindMostSuitableTypeWithTime() {
+        testFindMostSuitableType("11:22:33", 
RecordFieldType.TIME.getDataType());
+    }
+
+    @Test
+    public void testFindMostSuitableTypeWithTimeStamp() {
+        testFindMostSuitableType("1111-11-11 11:22:33", 
RecordFieldType.TIMESTAMP.getDataType());
+    }
+
+    @Test
+    public void testFindMostSuitableTypeWithChar() {
+        testFindMostSuitableType('a', RecordFieldType.CHAR.getDataType());
+    }
+
+    @Test
+    public void testFindMostSuitableTypeWithStringShouldReturnChar() {
+        testFindMostSuitableType("abc", RecordFieldType.CHAR.getDataType());
+    }
+
+    @Test
+    public void testFindMostSuitableTypeWithString() {
+        testFindMostSuitableType("abc", RecordFieldType.STRING.getDataType(), 
RecordFieldType.CHAR.getDataType());
+    }
+
+    @Test
+    public void testFindMostSuitableTypeWithArray() {
+        testFindMostSuitableType(new int[]{1, 2, 3}, 
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.INT.getDataType()));
+    }
+
+    private void testFindMostSuitableType(Object value, DataType expected, 
DataType... filtered) {
+        List<DataType> filteredOutDataTypes = 
Arrays.stream(filtered).collect(Collectors.toList());
+
+        // GIVEN
+        List<DataType> unexpectedTypes = 
Arrays.stream(RecordFieldType.values())
+                .flatMap(recordFieldType -> {
+                    Stream<DataType> dataTypeStream;
+
+                    if (RecordFieldType.ARRAY.equals(recordFieldType)) {
+                        dataTypeStream = 
Arrays.stream(RecordFieldType.values()).map(elementType -> 
RecordFieldType.ARRAY.getArrayDataType(elementType.getDataType()));
+                    } else {
+                        dataTypeStream = 
Stream.of(recordFieldType.getDataType());
+                    }
+
+                    return dataTypeStream;
+                })
+                .filter(dataType -> !dataType.equals(expected))
+                .filter(dataType -> !filteredOutDataTypes.contains(dataType))
+                .collect(Collectors.toList());
+
+        IntStream.rangeClosed(0, unexpectedTypes.size()).forEach(insertIndex 
-> {
+            List<DataType> allTypes = new LinkedList<>(unexpectedTypes);
+            allTypes.add(insertIndex, expected);
+
+            // WHEN
+            Optional<DataType> actual = 
DataTypeUtils.findMostSuitableType(value, allTypes, Function.identity());
+
+            // THEN
+            assertEquals(Optional.ofNullable(expected), actual);
+        });
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
index 2bc95bc..a0eea8b 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
@@ -74,6 +74,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 
 public class AvroTypeUtil {
     private static final Logger logger = 
LoggerFactory.getLogger(AvroTypeUtil.class);
@@ -877,6 +878,16 @@ public class AvroTypeUtil {
      */
     private static Object convertUnionFieldValue(final Object originalValue, 
final Schema fieldSchema, final Function<Schema, Object> conversion, final 
String fieldName) {
         boolean foundNonNull = false;
+
+        Optional<Schema> mostSuitableType = DataTypeUtils.findMostSuitableType(
+                originalValue,
+                fieldSchema.getTypes().stream().filter(schema -> 
schema.getType() != Type.NULL).collect(Collectors.toList()),
+                subSchema -> AvroTypeUtil.determineDataType(subSchema)
+        );
+        if (mostSuitableType.isPresent()) {
+            return conversion.apply(mostSuitableType.get());
+        }
+
         for (final Schema subSchema : fieldSchema.getTypes()) {
             if (subSchema.getType() == Type.NULL) {
                 continue;
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java
index 0ecbe25..a89ebe4 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java
@@ -55,6 +55,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.UUID;
+import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
@@ -587,4 +588,56 @@ public class TestAvroTypeUtil {
             assertNotNull(((Record)inner).get("Message"));
         }
     }
+
+    @Test
+    public void 
testConvertToAvroObjectWhenIntVSUnion_INT_FLOAT_ThenReturnInt() {
+        // GIVEN
+        List<Schema.Type> schemaTypes = Arrays.asList(
+                Schema.Type.INT,
+                Schema.Type.FLOAT
+        );
+        Integer rawValue = 1;
+
+        Object expected = 1;
+
+        // WHEN
+        // THEN
+        testConvertToAvroObjectAlsoReverseSchemaList(expected, rawValue, 
schemaTypes);
+    }
+
+    @Test
+    public void 
testConvertToAvroObjectWhenFloatVSUnion_INT_FLOAT_ThenReturnFloat() {
+        // GIVEN
+        List<Schema.Type> schemaTypes = Arrays.asList(
+                Schema.Type.INT,
+                Schema.Type.FLOAT
+        );
+        Float rawValue = 1.5f;
+
+        Object expected = 1.5f;
+
+        // WHEN
+        // THEN
+        testConvertToAvroObjectAlsoReverseSchemaList(expected, rawValue, 
schemaTypes);
+    }
+
+    private void testConvertToAvroObjectAlsoReverseSchemaList(Object expected, 
Object rawValue, List<Schema.Type> schemaTypes) {
+        // GIVEN
+        List<Schema> schemaList = schemaTypes.stream()
+                .map(Schema::create)
+                .collect(Collectors.toList());
+
+        // WHEN
+        Object actual = AvroTypeUtil.convertToAvroObject(rawValue, 
Schema.createUnion(schemaList), StandardCharsets.UTF_16);
+
+        // THEN
+        assertEquals(expected, actual);
+
+        // WHEN
+        Collections.reverse(schemaList);
+        Object actualAfterReverse = AvroTypeUtil.convertToAvroObject(rawValue, 
Schema.createUnion(schemaList), StandardCharsets.UTF_16);
+
+        // THEN
+        assertEquals(expected, actualAfterReverse);
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index 0b15be0..ed51c40 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -411,6 +411,14 @@
                         
<exclude>src/test/resources/TestMergeContent/head</exclude>
                         
<exclude>src/test/resources/TestMergeContent/user.avsc</exclude>
                         
<exclude>src/test/resources/TestMergeContent/place.avsc</exclude>
+                        
<exclude>src/test/resources/TestConversions/data.int_float_string.json</exclude>
+                        
<exclude>src/test/resources/TestConversions/data.int_float_string.with_header.csv</exclude>
+                        
<exclude>src/test/resources/TestConversions/data.int_float_string.without_header.csv</exclude>
+                        
<exclude>src/test/resources/TestConversions/data.int_float_string.xml</exclude>
+                        
<exclude>src/test/resources/TestConversions/data.int_float_string.with_schema.avro</exclude>
+                        
<exclude>src/test/resources/TestConversions/data.int_float_string.with_schema.json.to.avro</exclude>
+                        
<exclude>src/test/resources/TestConversions/data.int_float_string.without_schema.avro</exclude>
+                        
<exclude>src/test/resources/TestConversions/explicit.schema.json</exclude>
                         
<exclude>src/test/resources/TestConvertJSONToSQL/person-1.json</exclude>
                         
<exclude>src/test/resources/TestConvertJSONToSQL/persons.json</exclude>
                         
<exclude>src/test/resources/TestConvertJSONToSQL/malformed-person-extra-comma.json</exclude>
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/AbstractConversionIT.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/AbstractConversionIT.java
new file mode 100644
index 0000000..9b458a2
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/AbstractConversionIT.java
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.avro.AvroReader;
+import org.apache.nifi.avro.AvroReaderWithEmbeddedSchema;
+import org.apache.nifi.avro.AvroRecordSetWriter;
+import org.apache.nifi.csv.CSVReader;
+import org.apache.nifi.csv.CSVRecordSetWriter;
+import org.apache.nifi.json.JsonRecordSetWriter;
+import org.apache.nifi.json.JsonTreeReader;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.apache.nifi.xml.XMLReader;
+import org.apache.nifi.xml.XMLRecordSetWriter;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.function.Consumer;
+
+import static org.junit.Assert.assertEquals;
+
+public abstract class AbstractConversionIT {
+    protected RecordReaderFactory reader;
+    protected Consumer<TestRunner> inputHandler;
+    protected Consumer<TestRunner> readerConfigurer;
+
+    protected RecordSetWriterFactory writer;
+    protected Consumer<MockFlowFile> resultHandler;
+    protected Consumer<TestRunner> writerConfigurer;
+
+    @Before
+    public void setUp() throws Exception {
+        reader = null;
+        inputHandler = null;
+        readerConfigurer = null;
+
+        writer = null;
+        resultHandler = null;
+        writerConfigurer = null;
+    }
+
+    @Test
+    public void testCsvToJson() throws Exception {
+        fromCsv(csvPostfix());
+        toJson(jsonPostfix());
+
+        testConversion(reader, readerConfigurer, writer, writerConfigurer, 
inputHandler, resultHandler);
+    }
+
+    @Test
+    public void testCsvToAvro() throws Exception {
+        fromCsv(csvPostfix());
+        toAvro(avroPostfix());
+
+        testConversion(reader, readerConfigurer, writer, writerConfigurer, 
inputHandler, resultHandler);
+    }
+
+    @Test
+    public void testCsvToAvroToCsv() throws Exception {
+        fromCsv(csvPostfix());
+
+        AvroRecordSetWriter writer2 = new AvroRecordSetWriter();
+        AvroReader reader2 = new AvroReader();
+
+        toCsv(csvPostfix());
+
+        testChain(writer2, reader2);
+    }
+
+    @Test
+    public void testCsvToXml() throws Exception {
+        fromCsv(csvPostfix());
+        toXml(xmlPostfix());
+
+        testConversion(reader, readerConfigurer, writer, writerConfigurer, 
inputHandler, resultHandler);
+    }
+
+    @Test
+    public void testJsonToCsv() throws Exception {
+        fromJson(jsonPostfix());
+        toCsv(csvPostfix());
+
+        testConversion(reader, readerConfigurer, writer, writerConfigurer, 
inputHandler, resultHandler);
+    }
+
+    @Test
+    public void testJsonToAvro() throws Exception {
+        fromJson(jsonPostfix());
+        toAvro(avroPostfix());
+
+        testConversion(reader, readerConfigurer, writer, writerConfigurer, 
inputHandler, resultHandler);
+    }
+
+    @Test
+    public void testJsonToAvroToJson() throws Exception {
+        fromJson(jsonPostfix());
+
+        AvroRecordSetWriter writer2 = new AvroRecordSetWriter();
+        AvroReader reader2 = new AvroReader();
+
+        toJson(jsonPostfix());
+
+        testChain(writer2, reader2);
+    }
+
+    @Test
+    public void testAvroToCsv() throws Exception {
+        fromAvro(avroPostfix());
+        toCsv(csvPostfix());
+
+        testConversion(reader, readerConfigurer, writer, writerConfigurer, 
inputHandler, resultHandler);
+    }
+
+    @Test
+    public void testAvroToJson() throws Exception {
+        fromAvro(avroPostfix());
+        toJson(jsonPostfix());
+
+        testConversion(reader, readerConfigurer, writer, writerConfigurer, 
inputHandler, resultHandler);
+    }
+
+    @Test
+    public void testAvroToXml() throws Exception {
+        fromAvro(avroPostfix());
+        toXml(xmlPostfix());
+
+        testConversion(reader, readerConfigurer, writer, writerConfigurer, 
inputHandler, resultHandler);
+    }
+
+    @Test
+    public void testXmlToCsv() throws Exception {
+        fromXml(xmlPostfix());
+        toCsv(csvPostfix());
+
+        testConversion(reader, readerConfigurer, writer, writerConfigurer, 
inputHandler, resultHandler);
+    }
+
+    @Test
+    public void testXmlToJson() throws Exception {
+        fromXml(xmlPostfix());
+        toJson(jsonPostfix());
+
+        testConversion(reader, readerConfigurer, writer, writerConfigurer, 
inputHandler, resultHandler);
+    }
+
+    @Test
+    public void testXmlToAvro() throws Exception {
+        fromXml(xmlPostfix());
+        toAvro(avroPostfix());
+
+        testConversion(reader, readerConfigurer, writer, writerConfigurer, 
inputHandler, resultHandler);
+    }
+
+    @Test
+    public void testXmlToAvroToXml() throws Exception {
+        fromXml(xmlPostfix());
+
+        AvroRecordSetWriter writer2 = new AvroRecordSetWriter();
+        AvroReader reader2 = new AvroReader();
+
+        toXml(xmlPostfix());
+
+        testChain(writer2, reader2);
+    }
+
+    abstract protected String csvPostfix();
+
+    abstract protected String jsonPostfix();
+
+    abstract protected String avroPostfix();
+
+    abstract protected String xmlPostfix();
+
+    protected void commonReaderConfiguration(TestRunner testRunner) {
+    }
+
+    protected void commonWriterConfiguration(TestRunner testRunner) {
+    }
+
+    protected void fromCsv(String postfix) {
+        reader = new CSVReader();
+        inputHandler = stringInputHandler(getContent(postfix));
+
+        readerConfigurer = testRunner -> {
+            commonReaderConfiguration(testRunner);
+        };
+    }
+
+    protected void fromJson(String postfix) {
+        reader = new JsonTreeReader();
+        inputHandler = stringInputHandler(getContent(postfix));
+
+        readerConfigurer = testRunner -> {
+            commonReaderConfiguration(testRunner);
+        };
+    }
+
+    protected void fromXml(String postfix) {
+        reader = new XMLReader();
+        inputHandler = stringInputHandler(getContent(postfix));
+
+        readerConfigurer = testRunner -> {
+            commonReaderConfiguration(testRunner);
+            testRunner.setProperty(reader, XMLReader.RECORD_FORMAT, 
XMLReader.RECORD_ARRAY);
+        };
+    }
+
+    protected void fromAvro(String postfix) {
+        reader = new AvroReader();
+        inputHandler = byteInputHandler(getByteContent(postfix));
+
+        readerConfigurer = testRunner -> {
+            commonReaderConfiguration(testRunner);
+        };
+    }
+
+    protected void toCsv(String postfix) {
+        writer = new CSVRecordSetWriter();
+        resultHandler = stringOutputHandler(getContent(postfix));
+
+        writerConfigurer = testRunner -> {
+            commonWriterConfiguration(testRunner);
+        };
+    }
+
+    protected void toJson(String postfix) {
+        writer = new JsonRecordSetWriter();
+        resultHandler = stringOutputHandler(getContent(postfix));
+
+        writerConfigurer = testRunner -> {
+            commonWriterConfiguration(testRunner);
+            testRunner.setProperty(writer, "Pretty Print JSON", "true");
+        };
+    }
+
+    protected void toXml(String postfix) {
+        writer = new XMLRecordSetWriter();
+        resultHandler = stringOutputHandler(getContent(postfix));
+
+        writerConfigurer = testRunner -> {
+            commonWriterConfiguration(testRunner);
+            testRunner.setProperty(writer, "pretty_print_xml", "true");
+            testRunner.setProperty(writer, "root_tag_name", "root");
+            testRunner.setProperty(writer, "record_tag_name", "nifiRecord");
+        };
+    }
+
+    protected void toAvro(String postfix) {
+        writer = new AvroRecordSetWriter();
+        resultHandler = mockFlowFile -> {
+            try {
+                List<Map<String, Object>> expected = 
getRecords(getByteContent(postfix));
+                List<Map<String, Object>> actual = 
getRecords(mockFlowFile.toByteArray());
+
+                assertEquals(expected, actual);
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        };
+
+        writerConfigurer = testRunner -> {
+            commonWriterConfiguration(testRunner);
+        };
+    }
+
+    protected Consumer<TestRunner> stringInputHandler(String input) {
+        return testRunner -> testRunner.enqueue(input);
+    }
+
+    protected Consumer<TestRunner> byteInputHandler(byte[] input) {
+        return testRunner -> testRunner.enqueue(input);
+    }
+
+    protected Consumer<MockFlowFile> stringOutputHandler(String expected) {
+        return mockFlowFile -> mockFlowFile.assertContentEquals(expected);
+    }
+
+    protected String getContent(String postfix) {
+        return new String(getByteContent(postfix));
+    }
+
+    protected byte[] getByteContent(String postfix) {
+        try {
+            return 
Files.readAllBytes(Paths.get("src/test/resources/TestConversions/data.int_float_string."
 + postfix));
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    protected List<Map<String, Object>> getRecords(byte[] avroData) throws 
IOException, MalformedRecordException {
+        try (RecordReader reader = new AvroReaderWithEmbeddedSchema(new 
ByteArrayInputStream(avroData));) {
+            return getRecords(reader);
+        }
+    }
+
+    protected List<Map<String, Object>> getRecords(RecordReader reader) throws 
IOException, MalformedRecordException {
+        List<Map<String, Object>> records = new ArrayList<>();
+
+        Record record;
+        while ((record = reader.nextRecord()) != null) {
+            records.add(record.toMap());
+        }
+
+        return records;
+    }
+
+    protected void testChain(RecordSetWriterFactory writer2, 
RecordReaderFactory reader2) throws InitializationException {
+        testConversion(reader, readerConfigurer, writer2, null,
+                inputHandler,
+                mockFlowFile -> {
+                    try {
+                        testConversion(reader2, null, writer, writerConfigurer,
+                                testRunner -> testRunner.enqueue(mockFlowFile),
+                                resultHandler
+                        );
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+    }
+
+    protected <R extends RecordReaderFactory, W extends 
RecordSetWriterFactory> void testConversion(
+            R reader,
+            Consumer<TestRunner> readerConfigurer,
+            W writer,
+            Consumer<TestRunner> writerConfigurer,
+            Consumer<TestRunner> inputHandler,
+            Consumer<MockFlowFile> resultHandler
+    ) throws InitializationException {
+        final TestRunner runner = 
TestRunners.newTestRunner(ConvertRecord.class);
+
+        String readerId = UUID.randomUUID().toString();
+        String writerId = UUID.randomUUID().toString();
+
+        runner.addControllerService(readerId, reader);
+        runner.addControllerService(writerId, writer);
+
+        Optional.ofNullable(readerConfigurer).ifPresent(_configurer -> 
_configurer.accept(runner));
+        Optional.ofNullable(writerConfigurer).ifPresent(_configurer -> 
_configurer.accept(runner));
+
+        runner.enableControllerService(reader);
+        runner.enableControllerService(writer);
+
+        runner.setProperty(ConvertRecord.RECORD_READER, readerId);
+        runner.setProperty(ConvertRecord.RECORD_WRITER, writerId);
+
+        inputHandler.accept(runner);
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ConvertRecord.REL_SUCCESS, 1);
+
+        MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(ConvertRecord.REL_SUCCESS).get(0);
+
+        resultHandler.accept(flowFile);
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ConversionWithExplicitSchemaIT.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ConversionWithExplicitSchemaIT.java
new file mode 100644
index 0000000..81467e3
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ConversionWithExplicitSchemaIT.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.avro.Schema;
+import org.apache.nifi.avro.AvroReaderWithExplicitSchema;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.csv.CSVUtils;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.TestRunner;
+import org.junit.Before;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.Map;
+
+public class ConversionWithExplicitSchemaIT extends AbstractConversionIT {
+    private String schema;
+
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+
+        schema = new 
String(Files.readAllBytes(Paths.get("src/test/resources/TestConversions/explicit.schema.json")));
+    }
+
+    @Override
+    protected String csvPostfix() {
+        return "without_header.csv";
+    }
+
+    @Override
+    protected String jsonPostfix() {
+        return "json";
+    }
+
+    @Override
+    protected String avroPostfix() {
+        return "without_schema.avro";
+    }
+
+    @Override
+    protected String xmlPostfix() {
+        return "xml";
+    }
+
+    @Override
+    protected void commonReaderConfiguration(TestRunner testRunner) {
+        testRunner.setProperty(reader, 
SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, 
SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+        testRunner.setProperty(reader, SchemaAccessUtils.SCHEMA_TEXT, schema);
+    }
+
+    @Override
+    protected void commonWriterConfiguration(TestRunner testRunner) {
+        testRunner.setProperty(writer, "Schema Write Strategy", "no-schema");
+        testRunner.setProperty(writer, CSVUtils.INCLUDE_HEADER_LINE, "false");
+    }
+
+    @Override
+    protected List<Map<String, Object>> getRecords(byte[] avroData) throws 
IOException, MalformedRecordException {
+        Schema avroSchema = new Schema.Parser().parse(schema);
+        RecordSchema recordSchema = AvroTypeUtil.createSchema(avroSchema);
+
+        try (RecordReader reader = new AvroReaderWithExplicitSchema(new 
ByteArrayInputStream(avroData), recordSchema, avroSchema);) {
+            return getRecords(reader);
+        }
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ConversionWithSchemaInferenceIT.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ConversionWithSchemaInferenceIT.java
new file mode 100644
index 0000000..bed820c
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ConversionWithSchemaInferenceIT.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+public class ConversionWithSchemaInferenceIT extends AbstractConversionIT {
+    @Override
+    protected String csvPostfix() {
+        return "with_header.csv";
+    }
+
+    @Override
+    protected String jsonPostfix() {
+        return "json";
+    }
+
+    @Override
+    protected String avroPostfix() {
+        return "with_schema.avro";
+    }
+
+    @Override
+    protected String xmlPostfix() {
+        return "xml";
+    }
+
+    @Override
+    public void testJsonToAvro() throws Exception {
+        fromJson(jsonPostfix());
+
+        // JSON schema inference doesn't discern INT and FLOAT but uses LONG 
and DOUBLE instead.
+        //  So the expected avro is a little bit different as the deserialized 
values also end up in
+        //      Long and Double objects
+        toAvro("with_schema.json.to.avro");
+
+        testConversion(reader, readerConfigurer, writer, writerConfigurer, 
inputHandler, resultHandler);
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.json
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.json
new file mode 100644
index 0000000..1971dcf
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.json
@@ -0,0 +1,22 @@
+[ {
+  "Id" : 1,
+  "Int_Float_String" : 3
+}, {
+  "Id" : 2,
+  "Int_Float_String" : 3.75
+}, {
+  "Id" : 3,
+  "Int_Float_String" : 3.85
+}, {
+  "Id" : 4,
+  "Int_Float_String" : 8
+}, {
+  "Id" : 5,
+  "Int_Float_String" : 2.0
+}, {
+  "Id" : 6,
+  "Int_Float_String" : 4.0
+}, {
+  "Id" : 7,
+  "Int_Float_String" : "some_string"
+} ]
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.with_header.csv
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.with_header.csv
new file mode 100644
index 0000000..d997288
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.with_header.csv
@@ -0,0 +1,8 @@
+Id,Int_Float_String
+1,3
+2,3.75
+3,3.85
+4,8
+5,2.0
+6,4.0
+7,some_string
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.with_schema.avro
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.with_schema.avro
new file mode 100644
index 0000000..3c18077
Binary files /dev/null and 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.with_schema.avro
 differ
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.with_schema.json.to.avro
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.with_schema.json.to.avro
new file mode 100644
index 0000000..4177b05
Binary files /dev/null and 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.with_schema.json.to.avro
 differ
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.without_header.csv
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.without_header.csv
new file mode 100644
index 0000000..65cf365
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.without_header.csv
@@ -0,0 +1,7 @@
+1,3
+2,3.75
+3,3.85
+4,8
+5,2.0
+6,4.0
+7,some_string
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.without_schema.avro
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.without_schema.avro
new file mode 100644
index 0000000..c800fe6
Binary files /dev/null and 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.without_schema.avro
 differ
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.xml
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.xml
new file mode 100644
index 0000000..36a9b10
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.xml
@@ -0,0 +1,31 @@
+<?xml version="1.0" ?>
+<root>
+  <nifiRecord>
+    <Id>1</Id>
+    <Int_Float_String>3</Int_Float_String>
+  </nifiRecord>
+  <nifiRecord>
+    <Id>2</Id>
+    <Int_Float_String>3.75</Int_Float_String>
+  </nifiRecord>
+  <nifiRecord>
+    <Id>3</Id>
+    <Int_Float_String>3.85</Int_Float_String>
+  </nifiRecord>
+  <nifiRecord>
+    <Id>4</Id>
+    <Int_Float_String>8</Int_Float_String>
+  </nifiRecord>
+  <nifiRecord>
+    <Id>5</Id>
+    <Int_Float_String>2.0</Int_Float_String>
+  </nifiRecord>
+  <nifiRecord>
+    <Id>6</Id>
+    <Int_Float_String>4.0</Int_Float_String>
+  </nifiRecord>
+  <nifiRecord>
+    <Id>7</Id>
+    <Int_Float_String>some_string</Int_Float_String>
+  </nifiRecord>
+</root>
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/explicit.schema.json
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/explicit.schema.json
new file mode 100644
index 0000000..5c578b0
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/explicit.schema.json
@@ -0,0 +1,23 @@
+{
+  "type":"record",
+  "name":"nifiRecord",
+  "namespace":"org.apache.nifi",
+  "fields":[
+    {
+      "name":"Id",
+      "type":[
+        "null",
+        "int"
+      ]
+    },
+    {
+      "name":"Int_Float_String",
+      "type":[
+        "int",
+        "float",
+        "string",
+        "null"
+      ]
+    }
+  ]
+}
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/inference/FieldTypeInference.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/inference/FieldTypeInference.java
index e148baf..1f52cb8 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/inference/FieldTypeInference.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/inference/FieldTypeInference.java
@@ -23,6 +23,7 @@ import 
org.apache.nifi.serialization.record.type.RecordDataType;
 import org.apache.nifi.serialization.record.util.DataTypeUtils;
 
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Set;
 
 public class FieldTypeInference {
@@ -33,7 +34,7 @@ public class FieldTypeInference {
     // unique value for the data type, and so this paradigm allows us to avoid 
the cost of creating
     // and using the HashSet.
     private DataType singleDataType = null;
-    private Set<DataType> possibleDataTypes;
+    private Set<DataType> possibleDataTypes = new HashSet<>();
 
     public void addPossibleDataType(final DataType dataType) {
         if (dataType == null) {
@@ -45,7 +46,7 @@ public class FieldTypeInference {
             return;
         }
 
-        if (possibleDataTypes == null && singleDataType.equals(dataType)) {
+        if (singleDataType.equals(dataType) || 
possibleDataTypes.contains(dataType)) {
             return;
         }
 
@@ -62,36 +63,42 @@ public class FieldTypeInference {
             final RecordSchema newSchema = ((RecordDataType) 
dataType).getChildSchema();
 
             final RecordSchema mergedSchema = 
DataTypeUtils.merge(singleDataTypeSchema, newSchema);
+            possibleDataTypes.remove(singleDataType);
             singleDataType = 
RecordFieldType.RECORD.getRecordDataType(mergedSchema);
+            possibleDataTypes.add(singleDataType);
             return;
         }
 
-        if (singleFieldType.isWiderThan(additionalFieldType)) {
-            // Assigned type is already wide enough to encompass the given type
-            return;
+        if (possibleDataTypes.isEmpty()) {
+            possibleDataTypes.add(singleDataType);
         }
 
-        if (additionalFieldType.isWiderThan(singleFieldType)) {
-            // The given type is wide enough to encompass the assigned type. 
So changed the assigned type to the given type.
-            singleDataType = dataType;
-            return;
+        for (DataType possibleDataType : possibleDataTypes) {
+            RecordFieldType possibleFieldType = 
possibleDataType.getFieldType();
+            if (!possibleFieldType.equals(RecordFieldType.STRING) && 
possibleFieldType.isWiderThan(additionalFieldType)) {
+                return;
+            }
         }
 
-        if (possibleDataTypes == null) {
-            possibleDataTypes = new HashSet<>();
-            possibleDataTypes.add(singleDataType);
+        Iterator<DataType> possibleDataTypeIterator = 
possibleDataTypes.iterator();
+        while (possibleDataTypeIterator.hasNext()) {
+            DataType possibleDataType = possibleDataTypeIterator.next();
+            RecordFieldType possibleFieldType = 
possibleDataType.getFieldType();
+
+            if (!additionalFieldType.equals(RecordFieldType.STRING) && 
additionalFieldType.isWiderThan(possibleFieldType)) {
+                possibleDataTypeIterator.remove();
+            }
         }
 
         possibleDataTypes.add(dataType);
     }
 
-
     /**
      * Creates a single DataType that represents the field
      * @return a single DataType that represents the field
      */
     public DataType toDataType() {
-        if (possibleDataTypes == null) {
+        if (possibleDataTypes.isEmpty()) {
             if (singleDataType == null) {
                 return DEFAULT_DATA_TYPE;
             }
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVSchemaInference.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVSchemaInference.java
index 9dc8f29..b8d6685 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVSchemaInference.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVSchemaInference.java
@@ -71,7 +71,13 @@ public class TestCSVSchemaInference {
         
assertEquals(RecordFieldType.TIMESTAMP.getDataType("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"),
 schema.getDataType("timestamp").get());
         assertEquals(RecordFieldType.TIME.getDataType("HH:mm:ss"), 
schema.getDataType("eventTime").get());
         assertEquals(RecordFieldType.DATE.getDataType("yyyy-MM-dd"), 
schema.getDataType("eventDate").get());
-        assertEquals(RecordFieldType.STRING.getDataType(), 
schema.getDataType("maybeTime").get());
+        assertEquals(
+                RecordFieldType.CHOICE.getChoiceDataType(
+                        RecordFieldType.TIME.getDataType("HH:mm:ss"),
+                        RecordFieldType.STRING.getDataType()
+                ),
+                schema.getDataType("maybeTime").get()
+        );
         assertEquals(RecordFieldType.DATE.getDataType("yyyy-MM-dd"), 
schema.getDataType("maybeDate").get());
 
         assertSame(RecordFieldType.INT, 
schema.getDataType("parentIds").get().getFieldType());
@@ -118,7 +124,13 @@ public class TestCSVSchemaInference {
         
assertEquals(RecordFieldType.TIMESTAMP.getDataType("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"),
 schema.getDataType("timestamp").get());
         assertEquals(RecordFieldType.TIME.getDataType("HH:mm:ss"), 
schema.getDataType("eventTime").get());
         assertEquals(RecordFieldType.DATE.getDataType("yyyy-MM-dd"), 
schema.getDataType("eventDate").get());
-        assertEquals(RecordFieldType.STRING.getDataType(), 
schema.getDataType("maybeTime").get());
+        assertEquals(
+                RecordFieldType.CHOICE.getChoiceDataType(
+                        RecordFieldType.TIME.getDataType("HH:mm:ss"),
+                        RecordFieldType.STRING.getDataType()
+                ),
+                schema.getDataType("maybeTime").get()
+        );
         assertEquals(RecordFieldType.DATE.getDataType("yyyy-MM-dd"), 
schema.getDataType("maybeDate").get());
 
         assertSame(RecordFieldType.INT, 
schema.getDataType("parentIds").get().getFieldType());
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java
index a012ebb..1cfaafd 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java
@@ -117,7 +117,7 @@ public class TestWriteCSVResult {
 
         final String values = splits[1];
         final StringBuilder expectedBuilder = new StringBuilder();
-        
expectedBuilder.append("\"true\",\"1\",\"8\",\"9\",\"8\",\"8\",\"8.0\",\"8.0\",\""
 + dateValue + "\",\"" + timeValue + "\",\"" + timestampValue + 
"\",\"c\",\"a孟bc李12儒3\",,\"48\",,");
+        
expectedBuilder.append("\"true\",\"1\",\"8\",\"9\",\"8\",\"8\",\"8.0\",\"8.0\",\""
 + timestampValue + "\",\"" + dateValue + "\",\"" + timeValue + 
"\",\"c\",\"a孟bc李12儒3\",,\"48\",,");
 
         final String expectedValues = expectedBuilder.toString();
 
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestInferJsonSchemaAccessStrategy.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestInferJsonSchemaAccessStrategy.java
index 4a10356..a4f356f 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestInferJsonSchemaAccessStrategy.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestInferJsonSchemaAccessStrategy.java
@@ -153,7 +153,10 @@ public class TestInferJsonSchemaAccessStrategy {
 
         // TIME value and a STRING should be inferred as a STRING field
         final RecordField maybeTimeField = schema.getField("maybeTime").get();
-        assertEquals(RecordFieldType.STRING, 
maybeTimeField.getDataType().getFieldType());
+        assertEquals(
+                RecordFieldType.CHOICE.getChoiceDataType().getFieldType(),
+                maybeTimeField.getDataType().getFieldType())
+        ;
 
         // DATE value and a null value should be inferred as a DATE field
         final RecordField maybeDateField = schema.getField("maybeDate").get();
@@ -169,7 +172,7 @@ public class TestInferJsonSchemaAccessStrategy {
         final RecordSchema schema = inferSchema(file);
 
         assertSame(RecordFieldType.STRING, 
schema.getDataType("name").get().getFieldType());
-        assertSame(RecordFieldType.STRING, 
schema.getDataType("age").get().getFieldType());
+        assertSame(RecordFieldType.CHOICE, 
schema.getDataType("age").get().getFieldType());
 
         final DataType valuesDataType = schema.getDataType("values").get();
         assertSame(RecordFieldType.CHOICE, valuesDataType.getFieldType());
@@ -178,7 +181,10 @@ public class TestInferJsonSchemaAccessStrategy {
         final List<DataType> possibleTypes = 
valuesChoiceType.getPossibleSubTypes();
         assertEquals(2, possibleTypes.size());
         
assertTrue(possibleTypes.contains(RecordFieldType.STRING.getDataType()));
-        
assertTrue(possibleTypes.contains(RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType())));
+        
assertTrue(possibleTypes.contains(RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.CHOICE.getChoiceDataType(
+                RecordFieldType.LONG.getDataType(),
+                RecordFieldType.STRING.getDataType()
+        ))));
 
         assertSame(RecordFieldType.STRING, 
schema.getDataType("nullValue").get().getFieldType());
     }
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/schema/inference/TestFieldTypeInference.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/schema/inference/TestFieldTypeInference.java
new file mode 100644
index 0000000..caf0038
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/schema/inference/TestFieldTypeInference.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schema.inference;
+
+import com.google.common.collect.Sets;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.type.ChoiceDataType;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+import static com.google.common.collect.Collections2.permutations;
+import static org.junit.Assert.assertEquals;
+
+public class TestFieldTypeInference {
+    private FieldTypeInference testSubject;
+
+    @Before
+    public void setUp() throws Exception {
+        testSubject = new FieldTypeInference();
+    }
+
+    @Test
+    public void testToDataTypeWith_SHORT_INT_LONG_shouldReturn_LONG() {
+        // GIVEN
+        List<DataType> dataTypes = Arrays.asList(
+                RecordFieldType.SHORT.getDataType(),
+                RecordFieldType.INT.getDataType(),
+                RecordFieldType.LONG.getDataType()
+        );
+
+        DataType expected = RecordFieldType.LONG.getDataType();
+
+        // WHEN
+        // THEN
+        runWithAllPermutations(this::testToDataTypeShouldReturnSingleType, 
dataTypes, expected);
+    }
+
+    @Test
+    public void testToDataTypeWith_INT_FLOAT_ShouldReturn_INT_FLOAT() {
+        // GIVEN
+        List<DataType> dataTypes = Arrays.asList(
+                RecordFieldType.INT.getDataType(),
+                RecordFieldType.FLOAT.getDataType()
+        );
+
+        Set<DataType> expected = Sets.newHashSet(
+                RecordFieldType.INT.getDataType(),
+                RecordFieldType.FLOAT.getDataType()
+        );
+
+        // WHEN
+        // THEN
+        runWithAllPermutations(this::testToDataTypeShouldReturnChoice, 
dataTypes, expected);
+    }
+
+    @Test
+    public void testToDataTypeWith_INT_STRING_shouldReturn_INT_STRING() {
+        // GIVEN
+        List<DataType> dataTypes = Arrays.asList(
+                RecordFieldType.INT.getDataType(),
+                RecordFieldType.STRING.getDataType()
+        );
+
+
+        Set<DataType> expected = Sets.newHashSet(
+                RecordFieldType.INT.getDataType(),
+                RecordFieldType.STRING.getDataType()
+        );
+
+        // WHEN
+        // THEN
+        runWithAllPermutations(this::testToDataTypeShouldReturnChoice, 
dataTypes, expected);
+    }
+
+    @Test
+    public void 
testToDataTypeWith_INT_FLOAT_STRING_shouldReturn_INT_FLOAT_STRING() {
+        // GIVEN
+        List<DataType> dataTypes = Arrays.asList(
+                RecordFieldType.INT.getDataType(),
+                RecordFieldType.FLOAT.getDataType(),
+                RecordFieldType.STRING.getDataType()
+        );
+
+        Set<DataType> expected = Sets.newHashSet(
+                RecordFieldType.INT.getDataType(),
+                RecordFieldType.FLOAT.getDataType(),
+                RecordFieldType.STRING.getDataType()
+        );
+
+        // WHEN
+        // THEN
+        runWithAllPermutations(this::testToDataTypeShouldReturnChoice, 
dataTypes, expected);
+    }
+
+    @Test
+    public void testToDataTypeWithMultipleRecord() {
+        // GIVEN
+        String fieldName = "fieldName";
+        DataType fieldType1 = RecordFieldType.INT.getDataType();
+        DataType fieldType2 = RecordFieldType.FLOAT.getDataType();
+        DataType fieldType3 = RecordFieldType.STRING.getDataType();
+
+        List<DataType> dataTypes = Arrays.asList(
+                
RecordFieldType.RECORD.getRecordDataType(createRecordSchema(fieldName, 
fieldType1)),
+                
RecordFieldType.RECORD.getRecordDataType(createRecordSchema(fieldName, 
fieldType2)),
+                
RecordFieldType.RECORD.getRecordDataType(createRecordSchema(fieldName, 
fieldType3)),
+                
RecordFieldType.RECORD.getRecordDataType(createRecordSchema(fieldName, 
fieldType2))
+        );
+
+        DataType expected = 
RecordFieldType.RECORD.getRecordDataType(createRecordSchema(
+                fieldName,
+                RecordFieldType.CHOICE.getChoiceDataType(
+                        fieldType1,
+                        fieldType2,
+                        fieldType3
+                )
+        ));
+
+        // WHEN
+        // THEN
+        runWithAllPermutations(this::testToDataTypeShouldReturnSingleType, 
dataTypes, expected);
+    }
+
+    private SimpleRecordSchema createRecordSchema(String fieldName, DataType 
fieldType) {
+        return new SimpleRecordSchema(Arrays.asList(
+                new RecordField(fieldName, fieldType)
+        ));
+    }
+
+    private <I, E> void runWithAllPermutations(BiFunction<List<I>, E, ?> test, 
List<I> input, E expected) {
+        permutations(input).forEach(inputPermutation -> 
test.apply(inputPermutation, expected));
+    }
+
+    private Void testToDataTypeShouldReturnChoice(List<DataType> dataTypes, 
Set<DataType> expected) {
+        // GIVEN
+        dataTypes.forEach(testSubject::addPossibleDataType);
+
+        // WHEN
+        DataType actual = testSubject.toDataType();
+
+        // THEN
+        assertEquals(expected, new HashSet<>(((ChoiceDataType) 
actual).getPossibleSubTypes()));
+
+        return null;
+    }
+
+    private Void testToDataTypeShouldReturnSingleType(List<DataType> 
dataTypes, DataType expected) {
+        // GIVEN
+        dataTypes.forEach(testSubject::addPossibleDataType);
+
+        // WHEN
+        DataType actual = testSubject.toDataType();
+
+        // THEN
+        assertEquals(expected, actual);
+
+        return null;
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestInferXmlSchema.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestInferXmlSchema.java
index 75d6988..56ae4e6 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestInferXmlSchema.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestInferXmlSchema.java
@@ -85,7 +85,13 @@ public class TestInferXmlSchema {
         assertSame(RecordFieldType.STRING, 
schema.getDataType("COUNTRY").get().getFieldType());
 
         
assertEquals(RecordFieldType.DATE.getDataType(timeValueInference.getDateFormat()),
 schema.getDataType("DOB").get());
-        assertEquals(RecordFieldType.STRING.getDataType(), 
schema.getDataType("TOB").get());
+        assertEquals(
+                RecordFieldType.CHOICE.getChoiceDataType(
+                        RecordFieldType.TIME.getDataType("HH:mm:ss"),
+                        RecordFieldType.STRING.getDataType()
+                ),
+                schema.getDataType("TOB").get()
+        );
         
assertEquals(RecordFieldType.TIMESTAMP.getDataType(timeValueInference.getTimestampFormat()),
 schema.getDataType("TSOB").get());
 
         final DataType addressDataType = schema.getDataType("ADDRESS").get();
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/output/dataTypes.json
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/output/dataTypes.json
index b4c73f8..0472512 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/output/dataTypes.json
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/output/dataTypes.json
@@ -7,9 +7,9 @@
   "bigint" : 8,
   "float" : 8.0,
   "double" : 8.0,
+  "timestamp" : "2017-01-01 17:00:00",
   "date" : "2017-01-01",
   "time" : "17:00:00",
-  "timestamp" : "2017-01-01 17:00:00",
   "char" : "c",
   "string" : "string",
   "record" : null,

Reply via email to