[ https://issues.apache.org/jira/browse/FLINK-20722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17253488#comment-17253488 ]
Rui Li commented on FLINK-20722: -------------------------------- I did a little debugging and I think it's because in {{BroadcastingOutput::collect}} we broadcast the same StreamRecord to all the outputs. However, HiveTableSink [converts|https://github.com/apache/flink/blob/release-1.12.0/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java#L209] RowData to Row. Therefore when we pass the record to the 2nd output in BroadcastingOutput, the output finds out the data is a Row and thus the exception. > Fail to insert into hive table due to ClassCastException > -------------------------------------------------------- > > Key: FLINK-20722 > URL: https://issues.apache.org/jira/browse/FLINK-20722 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime > Reporter: Rui Li > Priority: Major > Fix For: 1.13.0 > > > Add the following test in {{TableEnvHiveConnectorITCase}} to reproduce the > issue: > {code} > @Test > public void test() throws Exception { > TableEnvironment tableEnv = getTableEnvWithHiveCatalog(); > tableEnv.executeSql("create table src1(key string, val > string)"); > tableEnv.executeSql("create table src2(key string, val > string)"); > tableEnv.executeSql("create table dest(key string, val > string)"); > HiveTestUtils.createTextTableInserter(hiveCatalog, "default", > "src1") > .addRow(new Object[]{"1", "val1"}) > .addRow(new Object[]{"2", "val2"}) > .addRow(new Object[]{"3", "val3"}) > .commit(); > HiveTestUtils.createTextTableInserter(hiveCatalog, "default", > "src2") > .addRow(new Object[]{"3", "val4"}) > .addRow(new Object[]{"4", "val4"}) > .commit(); > tableEnv.executeSql("INSERT OVERWRITE dest\n" + > "SELECT j.*\n" + > "FROM (SELECT t1.key, p1.val\n" + > " FROM src2 t1\n" + > " LEFT OUTER JOIN src1 p1\n" + > " ON (t1.key = p1.key)\n" + > " UNION ALL\n" + > " SELECT t2.key, p2.val\n" + > " FROM src2 t2\n" + > " LEFT OUTER JOIN src1 p2\n" + > " ON (t2.key = p2.key)) j").await(); > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)