[ 
https://issues.apache.org/jira/browse/FLINK-33732?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lekelei updated FLINK-33732:
----------------------------
    Description: 
Here is my sql:
```

SET 'execution.runtime-mode' = 'streaming';
SET 'table.dynamic-table-options.enabled' = 'true';
SET 'table.exec.source.cdc-events-duplicate' = 'false';
SET 'pipeline.operator-chaining' = 'false';

CREATE CATALOG catalog_hive WITH (
'type' = 'hive',

...
);

 

create table kafka_source(
 item1 STRING,
item2 INT,
item3 string,
PRIMARY KEY (item1, item2) NOT ENFORCED,
process_time as proctime()
– WATERMARK FOR `ts` AS ts - INTERVAL '10' SECOND
) WITH (
'connector' = 'kafka',
'format' = 'json',

...
);

 

CREATE TABLE blackhole_sink (
comp STRING,
order STRING,
order_line INT,
order_sequence INT,
material_code STRING,
warehouse_code STRING,
quantity DOUBLE
)WITH (
'connector' = 'blackhole'
);

 

insert into
blackhole_sink

select item1,comp from kafka_source a left join 

catalog_hive.db.hive_lookup_tb 

/*+ OPTIONS('streaming-source.partition.include'='latest',
'streaming-source.monitor-interval'='60 min','streaming-source.enable'='true') 
*/
FOR SYSTEM_TIME AS OF kafka_source.process_time as b on a.item1 = b.comp;

```
The error stack is as follows:
java.io.IOException: java.io.IOException: 
java.lang.ArrayIndexOutOfBoundsException: 10
    at 
com.netease.sloth.flink.connector.filesystem.table.meta.TableMetaStore.lambda$runSecured$1(TableMetaStore.java:700)
 ~[classes/:?]
    at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_332]
    at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_332]
    at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
 ~[hadoop-common-2.7.5.jar:?]

  was:
Here is my sql:
```

SET 'execution.runtime-mode' = 'streaming';
SET 'table.dynamic-table-options.enabled' = 'true';
SET 'table.exec.source.cdc-events-duplicate' = 'false';
SET 'pipeline.operator-chaining' = 'false';


CREATE CATALOG catalog_hive WITH (
'type' = 'hive',

...
);

 

create table kafka_source(
 item1 STRING,
item2 INT,
item3 string,
PRIMARY KEY (item1, item2) NOT ENFORCED,
process_time as proctime()
-- WATERMARK FOR `ts` AS ts - INTERVAL '10' SECOND
) WITH (
'connector' = 'kafka',
'format' = 'json',

...
);

 

CREATE TABLE blackhole_sink (
comp STRING,
order STRING,
order_line INT,
order_sequence INT,
material_code STRING,
warehouse_code STRING,
quantity DOUBLE
)WITH (
'connector' = 'blackhole'
);

 

insert into
dwd_pd_purchase_received_detail_arctic_rt_180502_test

select item1,comp from kafka_source a left join 

catalog_hive.db.hive_lookup_tb 

/*+ OPTIONS('streaming-source.partition.include'='latest',
'streaming-source.monitor-interval'='60 min','streaming-source.enable'='true') 
*/
FOR SYSTEM_TIME AS OF kafka_source.process_time as b on a.item1 = b.comp;

```
The error stack is as follows:
java.io.IOException: java.io.IOException: 
java.lang.ArrayIndexOutOfBoundsException: 10
    at 
com.netease.sloth.flink.connector.filesystem.table.meta.TableMetaStore.lambda$runSecured$1(TableMetaStore.java:700)
 ~[classes/:?]
    at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_332]
    at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_332]
    at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
 ~[hadoop-common-2.7.5.jar:?]


> Hive Lookup Join ProjectPushDown will encouter ArrayIndexOutOfBoundsException
> -----------------------------------------------------------------------------
>
>                 Key: FLINK-33732
>                 URL: https://issues.apache.org/jira/browse/FLINK-33732
>             Project: Flink
>          Issue Type: Bug
>            Reporter: lekelei
>            Priority: Major
>
> Here is my sql:
> ```
> SET 'execution.runtime-mode' = 'streaming';
> SET 'table.dynamic-table-options.enabled' = 'true';
> SET 'table.exec.source.cdc-events-duplicate' = 'false';
> SET 'pipeline.operator-chaining' = 'false';
> CREATE CATALOG catalog_hive WITH (
> 'type' = 'hive',
> ...
> );
>  
> create table kafka_source(
>  item1 STRING,
> item2 INT,
> item3 string,
> PRIMARY KEY (item1, item2) NOT ENFORCED,
> process_time as proctime()
> – WATERMARK FOR `ts` AS ts - INTERVAL '10' SECOND
> ) WITH (
> 'connector' = 'kafka',
> 'format' = 'json',
> ...
> );
>  
> CREATE TABLE blackhole_sink (
> comp STRING,
> order STRING,
> order_line INT,
> order_sequence INT,
> material_code STRING,
> warehouse_code STRING,
> quantity DOUBLE
> )WITH (
> 'connector' = 'blackhole'
> );
>  
> insert into
> blackhole_sink
> select item1,comp from kafka_source a left join 
> catalog_hive.db.hive_lookup_tb 
> /*+ OPTIONS('streaming-source.partition.include'='latest',
> 'streaming-source.monitor-interval'='60 
> min','streaming-source.enable'='true') */
> FOR SYSTEM_TIME AS OF kafka_source.process_time as b on a.item1 = b.comp;
> ```
> The error stack is as follows:
> java.io.IOException: java.io.IOException: 
> java.lang.ArrayIndexOutOfBoundsException: 10
>     at 
> com.netease.sloth.flink.connector.filesystem.table.meta.TableMetaStore.lambda$runSecured$1(TableMetaStore.java:700)
>  ~[classes/:?]
>     at java.security.AccessController.doPrivileged(Native Method) 
> ~[?:1.8.0_332]
>     at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_332]
>     at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>  ~[hadoop-common-2.7.5.jar:?]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to