Repository: nifi
Updated Branches:
  refs/heads/master e24388aa7 -> 0dd382370


NIFI-5612: Support JDBC drivers that return Long for unsigned ints

Refactors tests in order to share code repeated in tests and to enable
some parameterized testing.

MySQL Connector/J 5.1.x in conjunction with MySQL 5.0.x will return
a Long for ResultSet#getObject when the SQL type is an unsigned integer.
This change prevents that error from occurring while implementing a more
informational exception describing what the failing object's POJO type
is in addition to its string value.

Signed-off-by: Matthew Burgess <mattyb...@apache.org>

This closes #3032


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0dd38237
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0dd38237
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0dd38237

Branch: refs/heads/master
Commit: 0dd382370bf139e0f8c1b22761e4aa306943dd77
Parents: e24388a
Author: Colin Dean <colin.d...@arcadia.io>
Authored: Wed Sep 19 20:27:47 2018 -0400
Committer: Matthew Burgess <mattyb...@apache.org>
Committed: Fri Sep 28 13:46:04 2018 -0400

----------------------------------------------------------------------
 .../processors/standard/util/JdbcCommon.java    |  25 ++-
 .../standard/util/JdbcCommonTestUtils.java      |  60 ++++++++
 .../standard/util/TestJdbcCommon.java           |  84 +++++++---
 .../util/TestJdbcCommonConvertToAvro.java       | 152 +++++++++++++++++++
 4 files changed, 293 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/0dd38237/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
index 03761c6..9681e2f 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
@@ -91,11 +91,13 @@ import org.apache.avro.SchemaBuilder.FieldAssembler;
 import org.apache.avro.SchemaBuilder.NullDefault;
 import org.apache.avro.SchemaBuilder.UnionAccumulator;
 import org.apache.avro.file.CodecFactory;
+import org.apache.avro.UnresolvedUnionException;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumWriter;
+import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.avro.AvroTypeUtil;
 import org.apache.nifi.components.PropertyDescriptor;
@@ -449,8 +451,11 @@ public class JdbcCommon {
                             } else {
                                 rec.put(i - 1, value);
                             }
+                        } else if ((value instanceof Long) && 
meta.getPrecision(i) < MAX_DIGITS_IN_INT) {
+                            int intValue = ((Long)value).intValue();
+                            rec.put(i-1, intValue);
                         } else {
-                            rec.put(i - 1, value);
+                            rec.put(i-1, value);
                         }
 
                     } else if (value instanceof Date) {
@@ -470,8 +475,22 @@ public class JdbcCommon {
                         rec.put(i - 1, value.toString());
                     }
                 }
-                dataFileWriter.append(rec);
-                nrOfRows += 1;
+                try {
+                    dataFileWriter.append(rec);
+                    nrOfRows += 1;
+                } catch (DataFileWriter.AppendWriteException awe) {
+                    Throwable rootCause = ExceptionUtils.getRootCause(awe);
+                    if(rootCause instanceof UnresolvedUnionException) {
+                        UnresolvedUnionException uue = 
(UnresolvedUnionException) rootCause;
+                        throw new RuntimeException(
+                                "Unable to resolve union for value " + 
uue.getUnresolvedDatum() +
+                                " with type " + 
uue.getUnresolvedDatum().getClass().getCanonicalName() +
+                                " while appending record " + rec,
+                                awe);
+                    } else {
+                        throw awe;
+                    }
+                }
 
                 if (options.maxRows > 0 && nrOfRows == options.maxRows)
                     break;

http://git-wip-us.apache.org/repos/asf/nifi/blob/0dd38237/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/JdbcCommonTestUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/JdbcCommonTestUtils.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/JdbcCommonTestUtils.java
new file mode 100644
index 0000000..ad57158
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/JdbcCommonTestUtils.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.util;
+
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class JdbcCommonTestUtils {
+    static ResultSet resultSetReturningMetadata(ResultSetMetaData metadata) 
throws SQLException {
+        final ResultSet rs = mock(ResultSet.class);
+        when(rs.getMetaData()).thenReturn(metadata);
+
+        final AtomicInteger counter = new AtomicInteger(1);
+        Mockito.doAnswer(new Answer<Boolean>() {
+            @Override
+            public Boolean answer(InvocationOnMock invocation) throws 
Throwable {
+                return counter.getAndDecrement() > 0;
+            }
+        }).when(rs).next();
+
+        return rs;
+    }
+
+    static InputStream convertResultSetToAvroInputStream(ResultSet rs) throws 
SQLException, IOException {
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+        JdbcCommon.convertToAvroStream(rs, baos, false);
+
+        final byte[] serializedBytes = baos.toByteArray();
+
+        return new ByteArrayInputStream(serializedBytes);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0dd38237/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java
index 5eca32a..9cf4fc1 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java
@@ -16,6 +16,8 @@
  */
 package org.apache.nifi.processors.standard.util;
 
+import static 
org.apache.nifi.processors.standard.util.JdbcCommonTestUtils.convertResultSetToAvroInputStream;
+import static 
org.apache.nifi.processors.standard.util.JdbcCommonTestUtils.resultSetReturningMetadata;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
@@ -425,16 +427,7 @@ public class TestJdbcCommon {
         when(metadata.getPrecision(1)).thenReturn(dbPrecision);
         when(metadata.getScale(1)).thenReturn(expectedScale);
 
-        final ResultSet rs = mock(ResultSet.class);
-        when(rs.getMetaData()).thenReturn(metadata);
-
-        final AtomicInteger counter = new AtomicInteger(1);
-        Mockito.doAnswer(new Answer<Boolean>() {
-            @Override
-            public Boolean answer(InvocationOnMock invocation) throws 
Throwable {
-                return counter.getAndDecrement() > 0;
-            }
-        }).when(rs).next();
+        final ResultSet rs = resultSetReturningMetadata(metadata);
 
         when(rs.getObject(Mockito.anyInt())).thenReturn(bigDecimal);
 
@@ -587,34 +580,75 @@ public class TestJdbcCommon {
         when(metadata.getColumnName(1)).thenReturn("t_int");
         when(metadata.getTableName(1)).thenReturn("table");
 
-        final ResultSet rs = mock(ResultSet.class);
-        when(rs.getMetaData()).thenReturn(metadata);
-
-        final AtomicInteger counter = new AtomicInteger(1);
-        Mockito.doAnswer(new Answer<Boolean>() {
-            @Override
-            public Boolean answer(InvocationOnMock invocation) throws 
Throwable {
-                return counter.getAndDecrement() > 0;
-            }
-        }).when(rs).next();
+        final ResultSet rs = resultSetReturningMetadata(metadata);
 
         final short s = 25;
         when(rs.getObject(Mockito.anyInt())).thenReturn(s);
 
-        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        final InputStream instream = convertResultSetToAvroInputStream(rs);
 
-        JdbcCommon.convertToAvroStream(rs, baos, false);
+        final DatumReader<GenericRecord> datumReader = new 
GenericDatumReader<>();
+        try (final DataFileStream<GenericRecord> dataFileReader = new 
DataFileStream<>(instream, datumReader)) {
+            GenericRecord record = null;
+            while (dataFileReader.hasNext()) {
+                record = dataFileReader.next(record);
+                assertEquals(Short.toString(s), 
record.get("t_int").toString());
+            }
+        }
+    }
 
-        final byte[] serializedBytes = baos.toByteArray();
+    @Test
+    public void 
testConvertToAvroStreamForUnsignedIntegerWithPrecision1ReturnedAsLong_NIFI5612()
 throws SQLException, IOException {
+        final String mockColumnName = "t_int";
+        final ResultSetMetaData metadata = mock(ResultSetMetaData.class);
+        when(metadata.getColumnCount()).thenReturn(1);
+        when(metadata.getColumnType(1)).thenReturn(Types.INTEGER);
+        when(metadata.isSigned(1)).thenReturn(false);
+        when(metadata.getPrecision(1)).thenReturn(1);
+        when(metadata.getColumnName(1)).thenReturn(mockColumnName);
+        when(metadata.getTableName(1)).thenReturn("table");
 
-        final InputStream instream = new ByteArrayInputStream(serializedBytes);
+        final ResultSet rs = resultSetReturningMetadata(metadata);
+
+        final Long ret = 0L;
+        when(rs.getObject(Mockito.anyInt())).thenReturn(ret);
+
+        final InputStream instream = convertResultSetToAvroInputStream(rs);
 
         final DatumReader<GenericRecord> datumReader = new 
GenericDatumReader<>();
         try (final DataFileStream<GenericRecord> dataFileReader = new 
DataFileStream<>(instream, datumReader)) {
             GenericRecord record = null;
             while (dataFileReader.hasNext()) {
                 record = dataFileReader.next(record);
-                assertEquals(Short.toString(s), 
record.get("t_int").toString());
+                assertEquals(Long.toString(ret), 
record.get(mockColumnName).toString());
+            }
+        }
+    }
+
+    @Test
+    public void testConvertToAvroStreamForUnsignedIntegerWithPrecision10() 
throws SQLException, IOException {
+        final String mockColumnName = "t_int";
+        final ResultSetMetaData metadata = mock(ResultSetMetaData.class);
+        when(metadata.getColumnCount()).thenReturn(1);
+        when(metadata.getColumnType(1)).thenReturn(Types.INTEGER);
+        when(metadata.isSigned(1)).thenReturn(false);
+        when(metadata.getPrecision(1)).thenReturn(10);
+        when(metadata.getColumnName(1)).thenReturn(mockColumnName);
+        when(metadata.getTableName(1)).thenReturn("table");
+
+        final ResultSet rs = resultSetReturningMetadata(metadata);
+
+        final Long ret = 0L;
+        when(rs.getObject(Mockito.anyInt())).thenReturn(ret);
+
+        final InputStream instream = convertResultSetToAvroInputStream(rs);
+
+        final DatumReader<GenericRecord> datumReader = new 
GenericDatumReader<>();
+        try (final DataFileStream<GenericRecord> dataFileReader = new 
DataFileStream<>(instream, datumReader)) {
+            GenericRecord record = null;
+            while (dataFileReader.hasNext()) {
+                record = dataFileReader.next(record);
+                assertEquals(Long.toString(ret), 
record.get(mockColumnName).toString());
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/0dd38237/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommonConvertToAvro.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommonConvertToAvro.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommonConvertToAvro.java
new file mode 100644
index 0000000..eb736e2
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommonConvertToAvro.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.util;
+
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.IntStream;
+
+import static java.sql.Types.INTEGER;
+import static java.sql.Types.SMALLINT;
+import static java.sql.Types.TINYINT;
+import static java.sql.Types.BIGINT;
+import static 
org.apache.nifi.processors.standard.util.JdbcCommonTestUtils.convertResultSetToAvroInputStream;
+import static 
org.apache.nifi.processors.standard.util.JdbcCommonTestUtils.resultSetReturningMetadata;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@RunWith(Parameterized.class)
+public class TestJdbcCommonConvertToAvro {
+
+    private final static boolean SIGNED = true;
+    private final static boolean UNSIGNED = false;
+
+    private static int[] range(int start, int end) {
+        return IntStream.rangeClosed(start, end).toArray();
+    }
+
+    @Parameterized.Parameters(name = "{index}: {0}")
+    public static Collection<TestParams> data() {
+        Map<Integer, int[]> typeWithPrecisionRange = new HashMap<>();
+        typeWithPrecisionRange.put(TINYINT, range(1,3));
+        typeWithPrecisionRange.put(SMALLINT, range(1,5));
+        typeWithPrecisionRange.put(INTEGER, range(1,9));
+
+        ArrayList<TestParams> params = new ArrayList<>();
+
+        typeWithPrecisionRange.forEach( (sqlType, precisions) -> {
+            for (int precision : precisions) {
+                params.add(new TestParams(sqlType, precision, SIGNED));
+                params.add(new TestParams(sqlType, precision, UNSIGNED));
+            }
+        });
+        // remove cases that we know should fail
+        params.removeIf(param ->
+            param.sqlType == INTEGER
+                    &&
+            param.precision == 9
+                    &&
+            param.signed == UNSIGNED
+        );
+
+        return params;
+    }
+
+    @Parameterized.Parameter
+    public TestParams testParams;
+
+    static class TestParams {
+        int sqlType;
+        int precision;
+        boolean signed;
+
+        TestParams(int sqlType, int precision, boolean signed) {
+            this.sqlType = sqlType;
+            this.precision = precision;
+            this.signed = signed;
+        }
+        private String humanReadableType() {
+            switch(sqlType){
+                case TINYINT:
+                    return "TINYINT";
+                case INTEGER:
+                    return "INTEGER";
+                case SMALLINT:
+                    return "SMALLINT";
+                case BIGINT:
+                    return "BIGINT";
+                default:
+                    return "UNKNOWN - ADD TO LIST";
+            }
+        }
+        private String humanReadableSigned() {
+            if(signed) return "SIGNED";
+            return "UNSIGNED";
+        }
+        public String toString(){
+            return String.format(
+                    "TestParams(SqlType=%s, Precision=%s, Signed=%s)",
+                    humanReadableType(),
+                    precision,
+                    humanReadableSigned());
+        }
+    }
+
+    @Test
+    public void testConvertToAvroStreamForNumbers() throws SQLException, 
IOException {
+        final ResultSetMetaData metadata = mock(ResultSetMetaData.class);
+        when(metadata.getColumnCount()).thenReturn(1);
+        when(metadata.getColumnType(1)).thenReturn(testParams.sqlType);
+        when(metadata.isSigned(1)).thenReturn(testParams.signed);
+        when(metadata.getPrecision(1)).thenReturn(testParams.precision);
+        when(metadata.getColumnName(1)).thenReturn("t_int");
+        when(metadata.getTableName(1)).thenReturn("table");
+
+        final ResultSet rs = resultSetReturningMetadata(metadata);
+
+        final int ret = 0;
+        when(rs.getObject(Mockito.anyInt())).thenReturn(ret);
+
+        final InputStream instream = convertResultSetToAvroInputStream(rs);
+
+        final DatumReader<GenericRecord> datumReader = new 
GenericDatumReader<>();
+        try (final DataFileStream<GenericRecord> dataFileReader = new 
DataFileStream<>(instream, datumReader)) {
+            GenericRecord record = null;
+            while (dataFileReader.hasNext()) {
+                record = dataFileReader.next(record);
+                assertEquals(Integer.toString(ret), 
record.get("t_int").toString());
+            }
+        }
+    }
+}

Reply via email to