[ 
https://issues.apache.org/jira/browse/FLINK-20722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17253488#comment-17253488
 ] 

Rui Li commented on FLINK-20722:
--------------------------------

I did a little debugging and I think it's because in 
{{BroadcastingOutput::collect}} we broadcast the same StreamRecord to all the 
outputs. However, HiveTableSink 
[converts|https://github.com/apache/flink/blob/release-1.12.0/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java#L209]
 RowData to Row. Therefore when we pass the record to the 2nd output in 
BroadcastingOutput, the output finds out the data is a Row and thus the 
exception.

> Fail to insert into hive table due to ClassCastException
> --------------------------------------------------------
>
>                 Key: FLINK-20722
>                 URL: https://issues.apache.org/jira/browse/FLINK-20722
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>            Reporter: Rui Li
>            Priority: Major
>             Fix For: 1.13.0
>
>
> Add the following test in {{TableEnvHiveConnectorITCase}} to reproduce the 
> issue:
> {code}
>       @Test
>       public void test() throws Exception {
>               TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
>               tableEnv.executeSql("create table src1(key string, val 
> string)");
>               tableEnv.executeSql("create table src2(key string, val 
> string)");
>               tableEnv.executeSql("create table dest(key string, val 
> string)");
>               HiveTestUtils.createTextTableInserter(hiveCatalog, "default", 
> "src1")
>                               .addRow(new Object[]{"1", "val1"})
>                               .addRow(new Object[]{"2", "val2"})
>                               .addRow(new Object[]{"3", "val3"})
>                               .commit();
>               HiveTestUtils.createTextTableInserter(hiveCatalog, "default", 
> "src2")
>                               .addRow(new Object[]{"3", "val4"})
>                               .addRow(new Object[]{"4", "val4"})
>                               .commit();
>               tableEnv.executeSql("INSERT OVERWRITE dest\n" +
>                               "SELECT j.*\n" +
>                               "FROM (SELECT t1.key, p1.val\n" +
>                               "      FROM src2 t1\n" +
>                               "      LEFT OUTER JOIN src1 p1\n" +
>                               "      ON (t1.key = p1.key)\n" +
>                               "      UNION ALL\n" +
>                               "      SELECT t2.key, p2.val\n" +
>                               "      FROM src2 t2\n" +
>                               "      LEFT OUTER JOIN src1 p2\n" +
>                               "      ON (t2.key = p2.key)) j").await();
>       }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to