[jira] [Closed] (FLINK-33732) Hive Lookup Join ProjectPushDown will encouter ArrayIndexOutOfBoundsException

2023-12-03 Thread lekelei (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33732?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lekelei closed FLINK-33732.
---
Resolution: Fixed

> Hive Lookup Join ProjectPushDown will encouter ArrayIndexOutOfBoundsException
> -
>
> Key: FLINK-33732
> URL: https://issues.apache.org/jira/browse/FLINK-33732
> Project: Flink
>  Issue Type: Bug
>Reporter: lekelei
>Priority: Major
>
> Here is my sql:
> ```
> SET 'execution.runtime-mode' = 'streaming';
> SET 'table.dynamic-table-options.enabled' = 'true';
> SET 'table.exec.source.cdc-events-duplicate' = 'false';
> SET 'pipeline.operator-chaining' = 'false';
> CREATE CATALOG catalog_hive WITH (
> 'type' = 'hive',
> ...
> );
>  
> create table kafka_source(
>  item1 STRING,
> item2 INT,
> item3 string,
> PRIMARY KEY (item1, item2) NOT ENFORCED,
> process_time as proctime()
> – WATERMARK FOR `ts` AS ts - INTERVAL '10' SECOND
> ) WITH (
> 'connector' = 'kafka',
> 'format' = 'json',
> ...
> );
>  
> CREATE TABLE blackhole_sink (
> comp STRING,
> order STRING,
> order_line INT,
> order_sequence INT,
> material_code STRING,
> warehouse_code STRING,
> quantity DOUBLE
> )WITH (
> 'connector' = 'blackhole'
> );
>  
> insert into
> blackhole_sink
> select item1,comp from kafka_source a left join 
> catalog_hive.db.hive_lookup_tb 
> /*+ OPTIONS('streaming-source.partition.include'='latest',
> 'streaming-source.monitor-interval'='60 
> min','streaming-source.enable'='true') */
> FOR SYSTEM_TIME AS OF kafka_source.process_time as b on a.item1 = b.comp;
> ```
> The error stack is as follows:
> java.io.IOException: java.io.IOException: 
> java.lang.ArrayIndexOutOfBoundsException: 10
>     at 
> com.netease.sloth.flink.connector.filesystem.table.meta.TableMetaStore.lambda$runSecured$1(TableMetaStore.java:700)
>  ~[classes/:?]
>     at java.security.AccessController.doPrivileged(Native Method) 
> ~[?:1.8.0_332]
>     at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_332]
>     at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>  ~[hadoop-common-2.7.5.jar:?]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33732) Hive Lookup Join ProjectPushDown will encouter ArrayIndexOutOfBoundsException

2023-12-03 Thread lekelei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17792646#comment-17792646
 ] 

lekelei commented on FLINK-33732:
-

Duplicates the issue [FLINK-30679] Can not load the data of hive dim table when 
project-push-down is introduced - ASF JIRA (apache.org), so closed.

> Hive Lookup Join ProjectPushDown will encouter ArrayIndexOutOfBoundsException
> -
>
> Key: FLINK-33732
> URL: https://issues.apache.org/jira/browse/FLINK-33732
> Project: Flink
>  Issue Type: Bug
>Reporter: lekelei
>Priority: Major
>
> Here is my sql:
> ```
> SET 'execution.runtime-mode' = 'streaming';
> SET 'table.dynamic-table-options.enabled' = 'true';
> SET 'table.exec.source.cdc-events-duplicate' = 'false';
> SET 'pipeline.operator-chaining' = 'false';
> CREATE CATALOG catalog_hive WITH (
> 'type' = 'hive',
> ...
> );
>  
> create table kafka_source(
>  item1 STRING,
> item2 INT,
> item3 string,
> PRIMARY KEY (item1, item2) NOT ENFORCED,
> process_time as proctime()
> – WATERMARK FOR `ts` AS ts - INTERVAL '10' SECOND
> ) WITH (
> 'connector' = 'kafka',
> 'format' = 'json',
> ...
> );
>  
> CREATE TABLE blackhole_sink (
> comp STRING,
> order STRING,
> order_line INT,
> order_sequence INT,
> material_code STRING,
> warehouse_code STRING,
> quantity DOUBLE
> )WITH (
> 'connector' = 'blackhole'
> );
>  
> insert into
> blackhole_sink
> select item1,comp from kafka_source a left join 
> catalog_hive.db.hive_lookup_tb 
> /*+ OPTIONS('streaming-source.partition.include'='latest',
> 'streaming-source.monitor-interval'='60 
> min','streaming-source.enable'='true') */
> FOR SYSTEM_TIME AS OF kafka_source.process_time as b on a.item1 = b.comp;
> ```
> The error stack is as follows:
> java.io.IOException: java.io.IOException: 
> java.lang.ArrayIndexOutOfBoundsException: 10
>     at 
> com.netease.sloth.flink.connector.filesystem.table.meta.TableMetaStore.lambda$runSecured$1(TableMetaStore.java:700)
>  ~[classes/:?]
>     at java.security.AccessController.doPrivileged(Native Method) 
> ~[?:1.8.0_332]
>     at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_332]
>     at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>  ~[hadoop-common-2.7.5.jar:?]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33732) Hive Lookup Join ProjectPushDown will encouter ArrayIndexOutOfBoundsException

2023-12-03 Thread lekelei (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33732?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lekelei updated FLINK-33732:

Description: 
Here is my sql:
```

SET 'execution.runtime-mode' = 'streaming';
SET 'table.dynamic-table-options.enabled' = 'true';
SET 'table.exec.source.cdc-events-duplicate' = 'false';
SET 'pipeline.operator-chaining' = 'false';

CREATE CATALOG catalog_hive WITH (
'type' = 'hive',

...
);

 

create table kafka_source(
 item1 STRING,
item2 INT,
item3 string,
PRIMARY KEY (item1, item2) NOT ENFORCED,
process_time as proctime()
– WATERMARK FOR `ts` AS ts - INTERVAL '10' SECOND
) WITH (
'connector' = 'kafka',
'format' = 'json',

...
);

 

CREATE TABLE blackhole_sink (
comp STRING,
order STRING,
order_line INT,
order_sequence INT,
material_code STRING,
warehouse_code STRING,
quantity DOUBLE
)WITH (
'connector' = 'blackhole'
);

 

insert into
blackhole_sink

select item1,comp from kafka_source a left join 

catalog_hive.db.hive_lookup_tb 

/*+ OPTIONS('streaming-source.partition.include'='latest',
'streaming-source.monitor-interval'='60 min','streaming-source.enable'='true') 
*/
FOR SYSTEM_TIME AS OF kafka_source.process_time as b on a.item1 = b.comp;

```
The error stack is as follows:
java.io.IOException: java.io.IOException: 
java.lang.ArrayIndexOutOfBoundsException: 10
    at 
com.netease.sloth.flink.connector.filesystem.table.meta.TableMetaStore.lambda$runSecured$1(TableMetaStore.java:700)
 ~[classes/:?]
    at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_332]
    at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_332]
    at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
 ~[hadoop-common-2.7.5.jar:?]

  was:
Here is my sql:
```

SET 'execution.runtime-mode' = 'streaming';
SET 'table.dynamic-table-options.enabled' = 'true';
SET 'table.exec.source.cdc-events-duplicate' = 'false';
SET 'pipeline.operator-chaining' = 'false';


CREATE CATALOG catalog_hive WITH (
'type' = 'hive',

...
);

 

create table kafka_source(
 item1 STRING,
item2 INT,
item3 string,
PRIMARY KEY (item1, item2) NOT ENFORCED,
process_time as proctime()
-- WATERMARK FOR `ts` AS ts - INTERVAL '10' SECOND
) WITH (
'connector' = 'kafka',
'format' = 'json',

...
);

 

CREATE TABLE blackhole_sink (
comp STRING,
order STRING,
order_line INT,
order_sequence INT,
material_code STRING,
warehouse_code STRING,
quantity DOUBLE
)WITH (
'connector' = 'blackhole'
);

 

insert into
dwd_pd_purchase_received_detail_arctic_rt_180502_test

select item1,comp from kafka_source a left join 

catalog_hive.db.hive_lookup_tb 

/*+ OPTIONS('streaming-source.partition.include'='latest',
'streaming-source.monitor-interval'='60 min','streaming-source.enable'='true') 
*/
FOR SYSTEM_TIME AS OF kafka_source.process_time as b on a.item1 = b.comp;

```
The error stack is as follows:
java.io.IOException: java.io.IOException: 
java.lang.ArrayIndexOutOfBoundsException: 10
    at 
com.netease.sloth.flink.connector.filesystem.table.meta.TableMetaStore.lambda$runSecured$1(TableMetaStore.java:700)
 ~[classes/:?]
    at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_332]
    at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_332]
    at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
 ~[hadoop-common-2.7.5.jar:?]


> Hive Lookup Join ProjectPushDown will encouter ArrayIndexOutOfBoundsException
> -
>
> Key: FLINK-33732
> URL: https://issues.apache.org/jira/browse/FLINK-33732
> Project: Flink
>  Issue Type: Bug
>Reporter: lekelei
>Priority: Major
>
> Here is my sql:
> ```
> SET 'execution.runtime-mode' = 'streaming';
> SET 'table.dynamic-table-options.enabled' = 'true';
> SET 'table.exec.source.cdc-events-duplicate' = 'false';
> SET 'pipeline.operator-chaining' = 'false';
> CREATE CATALOG catalog_hive WITH (
> 'type' = 'hive',
> ...
> );
>  
> create table kafka_source(
>  item1 STRING,
> item2 INT,
> item3 string,
> PRIMARY KEY (item1, item2) NOT ENFORCED,
> process_time as proctime()
> – WATERMARK FOR `ts` AS ts - INTERVAL '10' SECOND
> ) WITH (
> 'connector' = 'kafka',
> 'format' = 'json',
> ...
> );
>  
> CREATE TABLE blackhole_sink (
> comp STRING,
> order STRING,
> order_line INT,
> order_sequence INT,
> material_code STRING,
> warehouse_code STRING,
> quantity DOUBLE
> )WITH (
> 'connector' = 'blackhole'
> );
>  
> insert into
> blackhole_sink
> select item1,comp from kafka_source a left join 
> catalog_hive.db.hive_lookup_tb 
> /*+ OPTIONS('streaming-source.partition.include'='latest',
> 'streaming-source.monitor-interval'='60 
> min','streaming-source.enable'='true') */
> FOR SYSTEM_TIME AS OF kafka_source.process_time as b on a.item1 = b.comp;
> ```
> The error stack is as follows:
> 

[jira] [Created] (FLINK-33732) Hive Lookup Join ProjectPushDown will encouter ArrayIndexOutOfBoundsException

2023-12-03 Thread lekelei (Jira)
lekelei created FLINK-33732:
---

 Summary: Hive Lookup Join ProjectPushDown will encouter 
ArrayIndexOutOfBoundsException
 Key: FLINK-33732
 URL: https://issues.apache.org/jira/browse/FLINK-33732
 Project: Flink
  Issue Type: Bug
Reporter: lekelei


Here is my sql:
```

SET 'execution.runtime-mode' = 'streaming';
SET 'table.dynamic-table-options.enabled' = 'true';
SET 'table.exec.source.cdc-events-duplicate' = 'false';
SET 'pipeline.operator-chaining' = 'false';


CREATE CATALOG catalog_hive WITH (
'type' = 'hive',

...
);

 

create table kafka_source(
 item1 STRING,
item2 INT,
item3 string,
PRIMARY KEY (item1, item2) NOT ENFORCED,
process_time as proctime()
-- WATERMARK FOR `ts` AS ts - INTERVAL '10' SECOND
) WITH (
'connector' = 'kafka',
'format' = 'json',

...
);

 

CREATE TABLE blackhole_sink (
comp STRING,
order STRING,
order_line INT,
order_sequence INT,
material_code STRING,
warehouse_code STRING,
quantity DOUBLE
)WITH (
'connector' = 'blackhole'
);

 

insert into
dwd_pd_purchase_received_detail_arctic_rt_180502_test

select item1,comp from kafka_source a left join 

catalog_hive.db.hive_lookup_tb 

/*+ OPTIONS('streaming-source.partition.include'='latest',
'streaming-source.monitor-interval'='60 min','streaming-source.enable'='true') 
*/
FOR SYSTEM_TIME AS OF kafka_source.process_time as b on a.item1 = b.comp;

```
The error stack is as follows:
java.io.IOException: java.io.IOException: 
java.lang.ArrayIndexOutOfBoundsException: 10
    at 
com.netease.sloth.flink.connector.filesystem.table.meta.TableMetaStore.lambda$runSecured$1(TableMetaStore.java:700)
 ~[classes/:?]
    at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_332]
    at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_332]
    at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
 ~[hadoop-common-2.7.5.jar:?]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-15656) Support user-specified pod templates

2022-11-29 Thread lekelei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17640626#comment-17640626
 ] 

lekelei edited comment on FLINK-15656 at 11/29/22 11:21 AM:


[~wangyang0918] 

Hello, I am using the function of pod-template, and I have a question: through 
the pod-template.YAML file provided in the reference document [Native 
Kubernetes | Apache Flink 
|https://nightlies.apache.org/flink/flink-docs-master/zh/docs/deployment/resource-providers/native_kubernetes/#pod-template],
 I can download the dependent jar files to the /opt/flink/artifact directory 
through the init container, but what is the way for the Flink jobmanager to 
load these jar packages? Mounting directly to the Flink jobmanager lib 
directory via emptyDir will overwrite the original lib directory and is not a 
viable method. I hope you can help me with this if you have time, thank you 
very much!


was (Author: JIRAUSER297957):
[~wangyang0918] 

Hello, I am using the function of pod-template, and I have a question: through 
the pod-template.YAML file provided in the reference document[Native Kubernetes 
| Apache 
Flink|https://nightlies.apache.org/flink/flink-docs-master/zh/docs/deployment/resource-providers/native_kubernetes/#pod-template],
 I can download the dependent jar files to the

/opt/flink/artifact directory through the init container, but what is the way 
for the Flink jobmanager to load these jar packages? Mounting directly to the 
Flink jobmanager lib directory via emptyDir will overwrite the original lib 
directory and is not a viable method. I hope you can help me with this if you 
have time, thank you very much!

> Support user-specified pod templates
> 
>
> Key: FLINK-15656
> URL: https://issues.apache.org/jira/browse/FLINK-15656
> Project: Flink
>  Issue Type: Sub-task
>  Components: Deployment / Kubernetes
>Reporter: Canbin Zheng
>Assignee: Yang Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.13.0
>
>
> The current approach of introducing new configuration options for each aspect 
> of pod specification a user might wish is becoming unwieldy, we have to 
> maintain more and more Flink side Kubernetes configuration options and users 
> have to learn the gap between the declarative model used by Kubernetes and 
> the configuration model used by Flink. It's a great improvement to allow 
> users to specify pod templates as central places for all customization needs 
> for the jobmanager and taskmanager pods.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-15656) Support user-specified pod templates

2022-11-29 Thread lekelei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17640626#comment-17640626
 ] 

lekelei commented on FLINK-15656:
-

[~wangyang0918] 

Hello, I am using the function of pod-template, and I have a question: through 
the pod-template.YAML file provided in the reference document[[Native 
Kubernetes | Apache 
Flink|https://nightlies.apache.org/flink/flink-docs-master/zh/docs/deployment/resource-providers/native_kubernetes/#pod-template]],
 I can download the dependent jar files to the

/opt/flink/artifact directory through the init container, but what is the way 
for the Flink jobmanager to load these jar packages? Mounting directly to the 
Flink jobmanager lib directory via emptyDir will overwrite the original lib 
directory and is not a viable method. I hope you can help me with this if you 
have time, thank you very much!

> Support user-specified pod templates
> 
>
> Key: FLINK-15656
> URL: https://issues.apache.org/jira/browse/FLINK-15656
> Project: Flink
>  Issue Type: Sub-task
>  Components: Deployment / Kubernetes
>Reporter: Canbin Zheng
>Assignee: Yang Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.13.0
>
>
> The current approach of introducing new configuration options for each aspect 
> of pod specification a user might wish is becoming unwieldy, we have to 
> maintain more and more Flink side Kubernetes configuration options and users 
> have to learn the gap between the declarative model used by Kubernetes and 
> the configuration model used by Flink. It's a great improvement to allow 
> users to specify pod templates as central places for all customization needs 
> for the jobmanager and taskmanager pods.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-15656) Support user-specified pod templates

2022-11-29 Thread lekelei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17640626#comment-17640626
 ] 

lekelei edited comment on FLINK-15656 at 11/29/22 11:18 AM:


[~wangyang0918] 

Hello, I am using the function of pod-template, and I have a question: through 
the pod-template.YAML file provided in the reference document[Native Kubernetes 
| Apache 
Flink|https://nightlies.apache.org/flink/flink-docs-master/zh/docs/deployment/resource-providers/native_kubernetes/#pod-template],
 I can download the dependent jar files to the

/opt/flink/artifact directory through the init container, but what is the way 
for the Flink jobmanager to load these jar packages? Mounting directly to the 
Flink jobmanager lib directory via emptyDir will overwrite the original lib 
directory and is not a viable method. I hope you can help me with this if you 
have time, thank you very much!


was (Author: JIRAUSER297957):
[~wangyang0918] 

Hello, I am using the function of pod-template, and I have a question: through 
the pod-template.YAML file provided in the reference document[[Native 
Kubernetes | Apache 
Flink|https://nightlies.apache.org/flink/flink-docs-master/zh/docs/deployment/resource-providers/native_kubernetes/#pod-template]],
 I can download the dependent jar files to the

/opt/flink/artifact directory through the init container, but what is the way 
for the Flink jobmanager to load these jar packages? Mounting directly to the 
Flink jobmanager lib directory via emptyDir will overwrite the original lib 
directory and is not a viable method. I hope you can help me with this if you 
have time, thank you very much!

> Support user-specified pod templates
> 
>
> Key: FLINK-15656
> URL: https://issues.apache.org/jira/browse/FLINK-15656
> Project: Flink
>  Issue Type: Sub-task
>  Components: Deployment / Kubernetes
>Reporter: Canbin Zheng
>Assignee: Yang Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.13.0
>
>
> The current approach of introducing new configuration options for each aspect 
> of pod specification a user might wish is becoming unwieldy, we have to 
> maintain more and more Flink side Kubernetes configuration options and users 
> have to learn the gap between the declarative model used by Kubernetes and 
> the configuration model used by Flink. It's a great improvement to allow 
> users to specify pod templates as central places for all customization needs 
> for the jobmanager and taskmanager pods.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)