This is an automated email from the ASF dual-hosted git repository.

fhueske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3b1c14f9cbc3f4db0c4c14a8ba55edbccef76729
Author: hequn8128 <chenghe...@gmail.com>
AuthorDate: Tue Apr 16 15:07:43 2019 +0800

    [FLINK-12204][jdbc] Improve JDBCOutputFormat ClassCastException.
    
    This closes #8182.
---
 .../flink/api/java/io/jdbc/JDBCOutputFormat.java      |  8 ++++++--
 .../apache/flink/api/java/io/jdbc/JDBCFullTest.java   | 19 +++++++++++++++++++
 .../apache/flink/api/java/io/jdbc/JDBCTestBase.java   |  5 +++++
 3 files changed, 30 insertions(+), 2 deletions(-)

diff --git 
a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
 
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
index 444485c..f773635 100644
--- 
a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
+++ 
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
@@ -198,8 +198,12 @@ public class JDBCOutputFormat extends 
RichOutputFormat<Row> {
                                                                        // case 
java.sql.Types.STRUC
                                                        }
                                                } catch (ClassCastException e) {
-                                                       throw new 
RuntimeException(
-                                                               "Field index: " 
+ index + ", field value: " + row.getField(index) + " " + e.getMessage(), e);
+                                                       // enrich the exception 
with detailed information.
+                                                       String errorMessage = 
String.format(
+                                                               "%s, field 
index: %s, field value: %s.", e.getMessage(), index, row.getField(index));
+                                                       ClassCastException 
enrichedException = new ClassCastException(errorMessage);
+                                                       
enrichedException.setStackTrace(e.getStackTrace());
+                                                       throw enrichedException;
                                                }
                                        }
                                }
diff --git 
a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
 
b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
index c1f2b25..51d39b2 100644
--- 
a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
+++ 
b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
@@ -50,6 +50,25 @@ public class JDBCFullTest extends JDBCTestBase {
                runTest(true);
        }
 
+       @Test
+       public void testEnrichedClassCastException() throws Exception {
+               exception.expect(ClassCastException.class);
+               exception.expectMessage(
+                       "java.lang.String cannot be cast to java.lang.Double, 
field index: 3, field value: 11.11.");
+
+               JDBCOutputFormat jdbcOutputFormat = 
JDBCOutputFormat.buildJDBCOutputFormat()
+                       .setDrivername(JDBCTestBase.DRIVER_CLASS)
+                       .setDBUrl(JDBCTestBase.DB_URL)
+                       .setQuery("insert into newbooks (id, title, author, 
price, qty) values (?,?,?,?,?)")
+                       .setSqlTypes(new int[]{Types.INTEGER, Types.VARCHAR, 
Types.VARCHAR, Types.DOUBLE, Types.INTEGER})
+                       .finish();
+
+               jdbcOutputFormat.open(1, 1);
+               Row inputRow = Row.of(1001, "Java public for dummies", "Tan Ah 
Teck", "11.11", 11);
+               jdbcOutputFormat.writeRecord(inputRow);
+               jdbcOutputFormat.close();
+       }
+
        private void runTest(boolean exploitParallelism) throws Exception {
                ExecutionEnvironment environment = 
ExecutionEnvironment.getExecutionEnvironment();
                JDBCInputFormatBuilder inputBuilder = 
JDBCInputFormat.buildJDBCInputFormat()
diff --git 
a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
 
b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
index 1d41d37..febbbd3 100644
--- 
a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
+++ 
b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
@@ -23,6 +23,8 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo;
 
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.ExpectedException;
 
 import java.io.OutputStream;
 import java.sql.Connection;
@@ -35,6 +37,9 @@ import java.sql.Statement;
  */
 public class JDBCTestBase {
 
+       @Rule
+       public ExpectedException exception = ExpectedException.none();
+
        public static final String DRIVER_CLASS = 
"org.apache.derby.jdbc.EmbeddedDriver";
        public static final String DB_URL = "jdbc:derby:memory:ebookshop";
        public static final String INPUT_TABLE = "books";

Reply via email to