This is an automated email from the ASF dual-hosted git repository. fhueske pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3b1c14f9cbc3f4db0c4c14a8ba55edbccef76729 Author: hequn8128 <chenghe...@gmail.com> AuthorDate: Tue Apr 16 15:07:43 2019 +0800 [FLINK-12204][jdbc] Improve JDBCOutputFormat ClassCastException. This closes #8182. --- .../flink/api/java/io/jdbc/JDBCOutputFormat.java | 8 ++++++-- .../apache/flink/api/java/io/jdbc/JDBCFullTest.java | 19 +++++++++++++++++++ .../apache/flink/api/java/io/jdbc/JDBCTestBase.java | 5 +++++ 3 files changed, 30 insertions(+), 2 deletions(-) diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java index 444485c..f773635 100644 --- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java +++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java @@ -198,8 +198,12 @@ public class JDBCOutputFormat extends RichOutputFormat<Row> { // case java.sql.Types.STRUC } } catch (ClassCastException e) { - throw new RuntimeException( - "Field index: " + index + ", field value: " + row.getField(index) + " " + e.getMessage(), e); + // enrich the exception with detailed information. + String errorMessage = String.format( + "%s, field index: %s, field value: %s.", e.getMessage(), index, row.getField(index)); + ClassCastException enrichedException = new ClassCastException(errorMessage); + enrichedException.setStackTrace(e.getStackTrace()); + throw enrichedException; } } } diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java index c1f2b25..51d39b2 100644 --- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java +++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java @@ -50,6 +50,25 @@ public class JDBCFullTest extends JDBCTestBase { runTest(true); } + @Test + public void testEnrichedClassCastException() throws Exception { + exception.expect(ClassCastException.class); + exception.expectMessage( + "java.lang.String cannot be cast to java.lang.Double, field index: 3, field value: 11.11."); + + JDBCOutputFormat jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat() + .setDrivername(JDBCTestBase.DRIVER_CLASS) + .setDBUrl(JDBCTestBase.DB_URL) + .setQuery("insert into newbooks (id, title, author, price, qty) values (?,?,?,?,?)") + .setSqlTypes(new int[]{Types.INTEGER, Types.VARCHAR, Types.VARCHAR, Types.DOUBLE, Types.INTEGER}) + .finish(); + + jdbcOutputFormat.open(1, 1); + Row inputRow = Row.of(1001, "Java public for dummies", "Tan Ah Teck", "11.11", 11); + jdbcOutputFormat.writeRecord(inputRow); + jdbcOutputFormat.close(); + } + private void runTest(boolean exploitParallelism) throws Exception { ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment(); JDBCInputFormatBuilder inputBuilder = JDBCInputFormat.buildJDBCInputFormat() diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java index 1d41d37..febbbd3 100644 --- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java +++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java @@ -23,6 +23,8 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.junit.AfterClass; import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.rules.ExpectedException; import java.io.OutputStream; import java.sql.Connection; @@ -35,6 +37,9 @@ import java.sql.Statement; */ public class JDBCTestBase { + @Rule + public ExpectedException exception = ExpectedException.none(); + public static final String DRIVER_CLASS = "org.apache.derby.jdbc.EmbeddedDriver"; public static final String DB_URL = "jdbc:derby:memory:ebookshop"; public static final String INPUT_TABLE = "books";