[ https://issues.apache.org/jira/browse/FLINK-33732?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
lekelei updated FLINK-33732: ---------------------------- Description: Here is my sql: ``` SET 'execution.runtime-mode' = 'streaming'; SET 'table.dynamic-table-options.enabled' = 'true'; SET 'table.exec.source.cdc-events-duplicate' = 'false'; SET 'pipeline.operator-chaining' = 'false'; CREATE CATALOG catalog_hive WITH ( 'type' = 'hive', ... ); create table kafka_source( item1 STRING, item2 INT, item3 string, PRIMARY KEY (item1, item2) NOT ENFORCED, process_time as proctime() – WATERMARK FOR `ts` AS ts - INTERVAL '10' SECOND ) WITH ( 'connector' = 'kafka', 'format' = 'json', ... ); CREATE TABLE blackhole_sink ( comp STRING, order STRING, order_line INT, order_sequence INT, material_code STRING, warehouse_code STRING, quantity DOUBLE )WITH ( 'connector' = 'blackhole' ); insert into blackhole_sink select item1,comp from kafka_source a left join catalog_hive.db.hive_lookup_tb /*+ OPTIONS('streaming-source.partition.include'='latest', 'streaming-source.monitor-interval'='60 min','streaming-source.enable'='true') */ FOR SYSTEM_TIME AS OF kafka_source.process_time as b on a.item1 = b.comp; ``` The error stack is as follows: java.io.IOException: java.io.IOException: java.lang.ArrayIndexOutOfBoundsException: 10 at com.netease.sloth.flink.connector.filesystem.table.meta.TableMetaStore.lambda$runSecured$1(TableMetaStore.java:700) ~[classes/:?] at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_332] at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_332] at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) ~[hadoop-common-2.7.5.jar:?] was: Here is my sql: ``` SET 'execution.runtime-mode' = 'streaming'; SET 'table.dynamic-table-options.enabled' = 'true'; SET 'table.exec.source.cdc-events-duplicate' = 'false'; SET 'pipeline.operator-chaining' = 'false'; CREATE CATALOG catalog_hive WITH ( 'type' = 'hive', ... ); create table kafka_source( item1 STRING, item2 INT, item3 string, PRIMARY KEY (item1, item2) NOT ENFORCED, process_time as proctime() -- WATERMARK FOR `ts` AS ts - INTERVAL '10' SECOND ) WITH ( 'connector' = 'kafka', 'format' = 'json', ... ); CREATE TABLE blackhole_sink ( comp STRING, order STRING, order_line INT, order_sequence INT, material_code STRING, warehouse_code STRING, quantity DOUBLE )WITH ( 'connector' = 'blackhole' ); insert into dwd_pd_purchase_received_detail_arctic_rt_180502_test select item1,comp from kafka_source a left join catalog_hive.db.hive_lookup_tb /*+ OPTIONS('streaming-source.partition.include'='latest', 'streaming-source.monitor-interval'='60 min','streaming-source.enable'='true') */ FOR SYSTEM_TIME AS OF kafka_source.process_time as b on a.item1 = b.comp; ``` The error stack is as follows: java.io.IOException: java.io.IOException: java.lang.ArrayIndexOutOfBoundsException: 10 at com.netease.sloth.flink.connector.filesystem.table.meta.TableMetaStore.lambda$runSecured$1(TableMetaStore.java:700) ~[classes/:?] at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_332] at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_332] at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) ~[hadoop-common-2.7.5.jar:?] > Hive Lookup Join ProjectPushDown will encouter ArrayIndexOutOfBoundsException > ----------------------------------------------------------------------------- > > Key: FLINK-33732 > URL: https://issues.apache.org/jira/browse/FLINK-33732 > Project: Flink > Issue Type: Bug > Reporter: lekelei > Priority: Major > > Here is my sql: > ``` > SET 'execution.runtime-mode' = 'streaming'; > SET 'table.dynamic-table-options.enabled' = 'true'; > SET 'table.exec.source.cdc-events-duplicate' = 'false'; > SET 'pipeline.operator-chaining' = 'false'; > CREATE CATALOG catalog_hive WITH ( > 'type' = 'hive', > ... > ); > > create table kafka_source( > item1 STRING, > item2 INT, > item3 string, > PRIMARY KEY (item1, item2) NOT ENFORCED, > process_time as proctime() > – WATERMARK FOR `ts` AS ts - INTERVAL '10' SECOND > ) WITH ( > 'connector' = 'kafka', > 'format' = 'json', > ... > ); > > CREATE TABLE blackhole_sink ( > comp STRING, > order STRING, > order_line INT, > order_sequence INT, > material_code STRING, > warehouse_code STRING, > quantity DOUBLE > )WITH ( > 'connector' = 'blackhole' > ); > > insert into > blackhole_sink > select item1,comp from kafka_source a left join > catalog_hive.db.hive_lookup_tb > /*+ OPTIONS('streaming-source.partition.include'='latest', > 'streaming-source.monitor-interval'='60 > min','streaming-source.enable'='true') */ > FOR SYSTEM_TIME AS OF kafka_source.process_time as b on a.item1 = b.comp; > ``` > The error stack is as follows: > java.io.IOException: java.io.IOException: > java.lang.ArrayIndexOutOfBoundsException: 10 > at > com.netease.sloth.flink.connector.filesystem.table.meta.TableMetaStore.lambda$runSecured$1(TableMetaStore.java:700) > ~[classes/:?] > at java.security.AccessController.doPrivileged(Native Method) > ~[?:1.8.0_332] > at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_332] > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) > ~[hadoop-common-2.7.5.jar:?] -- This message was sent by Atlassian Jira (v8.20.10#820010)