[jira] [Commented] (FLINK-28404) Annotation @InjectClusterClient does not work correctly with RestClusterClient

2022-07-05 Thread zl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17562636#comment-17562636
 ] 

zl commented on FLINK-28404:


For example, ReactiveModeITCase use {{RestClusterClient}} to get job details, 
See 
[ReactiveModeITCase.java#L202|https://github.com/apache/flink/blob/a89152713aa58647841c46ed2335b45d24c553f9/flink-tests/src/test/java/org/apache/flink/test/scheduling/ReactiveModeITCase.java#L202].
 when we port ReactiveModeITCase to junit5, we would need inject 
{{RestClusterClient.}}

And [MiniClusterExtension.java#L107 
|https://github.com/apache/flink/blob/master/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/junit5/MiniClusterExtension.java#L107]provides
 an example about how to inject RestClusterClient, but it does not work well.

> Annotation @InjectClusterClient does not work correctly with RestClusterClient
> --
>
> Key: FLINK-28404
> URL: https://issues.apache.org/jira/browse/FLINK-28404
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Reporter: zl
>Priority: Major
>
> *test code:*
> {code:java}
> public class Test {
> @RegisterExtension
> private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
> new MiniClusterExtension(
> new MiniClusterResourceConfiguration.Builder()
> .setNumberTaskManagers(1)
> .setNumberSlotsPerTaskManager(4)
> .build());
> @org.junit.jupiter.api.Test
> void test(@InjectClusterClient RestClusterClient restClusterClient) 
> throws Exception {
> Object clusterId = restClusterClient.getClusterId();
> }
> } {code}
> *error info:*
> {code:java}
> org.junit.jupiter.api.extension.ParameterResolutionException: No 
> ParameterResolver registered for parameter 
> [org.apache.flink.client.program.rest.RestClusterClient arg0] in method... 
> {code}
> this problem occurs because 
> [MiniClusterExtension#supportsParameter|https://github.com/apache/flink/blob/master/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/junit5/MiniClusterExtension.java#L168]
>  does not support *_RestClusterClient_* parameterType. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28404) Annotation @InjectClusterClient does not work correctly with RestClusterClient

2022-07-05 Thread zl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zl updated FLINK-28404:
---
Summary: Annotation @InjectClusterClient does not work correctly with 
RestClusterClient  (was: Annotation @InjectClusterClient do not work correctly)

> Annotation @InjectClusterClient does not work correctly with RestClusterClient
> --
>
> Key: FLINK-28404
> URL: https://issues.apache.org/jira/browse/FLINK-28404
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Reporter: zl
>Priority: Major
>
> *test code:*
> {code:java}
> public class Test {
> @RegisterExtension
> private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
> new MiniClusterExtension(
> new MiniClusterResourceConfiguration.Builder()
> .setNumberTaskManagers(1)
> .setNumberSlotsPerTaskManager(4)
> .build());
> @org.junit.jupiter.api.Test
> void test(@InjectClusterClient RestClusterClient restClusterClient) 
> throws Exception {
> Object clusterId = restClusterClient.getClusterId();
> }
> } {code}
> *error info:*
> {code:java}
> org.junit.jupiter.api.extension.ParameterResolutionException: No 
> ParameterResolver registered for parameter 
> [org.apache.flink.client.program.rest.RestClusterClient arg0] in method... 
> {code}
> this problem occurs because 
> [MiniClusterExtension#supportsParameter|https://github.com/apache/flink/blob/master/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/junit5/MiniClusterExtension.java#L168]
>  does not support *_RestClusterClient_* parameterType. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28404) Annotation @InjectClusterClient do not work correctly

2022-07-05 Thread zl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17562617#comment-17562617
 ] 

zl commented on FLINK-28404:


Hi [~rmetzger] Can you confirm this problem ?

> Annotation @InjectClusterClient do not work correctly
> -
>
> Key: FLINK-28404
> URL: https://issues.apache.org/jira/browse/FLINK-28404
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Reporter: zl
>Priority: Major
>
> *test code:*
> {code:java}
> public class Test {
> @RegisterExtension
> private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
> new MiniClusterExtension(
> new MiniClusterResourceConfiguration.Builder()
> .setNumberTaskManagers(1)
> .setNumberSlotsPerTaskManager(4)
> .build());
> @org.junit.jupiter.api.Test
> void test(@InjectClusterClient RestClusterClient restClusterClient) 
> throws Exception {
> Object clusterId = restClusterClient.getClusterId();
> }
> } {code}
> *error info:*
> {code:java}
> org.junit.jupiter.api.extension.ParameterResolutionException: No 
> ParameterResolver registered for parameter 
> [org.apache.flink.client.program.rest.RestClusterClient arg0] in method... 
> {code}
> this problem occurs because 
> [MiniClusterExtension#supportsParameter|https://github.com/apache/flink/blob/master/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/junit5/MiniClusterExtension.java#L168]
>  does not support *_RestClusterClient_* parameterType. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28404) Annotation @InjectClusterClient do not work correctly

2022-07-05 Thread zl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zl updated FLINK-28404:
---
Description: 
*test code:*
{code:java}
public class Test {

@RegisterExtension
private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
new MiniClusterExtension(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(4)
.build());

@org.junit.jupiter.api.Test
void test(@InjectClusterClient RestClusterClient restClusterClient) 
throws Exception {
Object clusterId = restClusterClient.getClusterId();
}
} {code}
*error info:*
{code:java}
org.junit.jupiter.api.extension.ParameterResolutionException: No 
ParameterResolver registered for parameter 
[org.apache.flink.client.program.rest.RestClusterClient arg0] in method... 
{code}
this problem occurs because 
[MiniClusterExtension#supportsParameter|https://github.com/apache/flink/blob/master/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/junit5/MiniClusterExtension.java#L168]
 does not support *_RestClusterClient_* parameterType. 

  was:
*test code:*
{code:java}
public class Test {

@RegisterExtension
private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
new MiniClusterExtension(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(4)
.build());

@org.junit.jupiter.api.Test
void test(@InjectClusterClient RestClusterClient restClusterClient) 
throws Exception {
Object clusterId = restClusterClient.getClusterId();
}
} {code}
*error info:*
{code:java}
org.junit.jupiter.api.extension.ParameterResolutionException: No 
ParameterResolver registered for parameter 
[org.apache.flink.client.program.rest.RestClusterClient arg0] in method... 
{code}
 

this problem occurs because 
[MiniClusterExtension#supportsParameter|https://github.com/apache/flink/blob/master/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/junit5/MiniClusterExtension.java#L168]
 does not support *_RestClusterClient_* parameterType. 


> Annotation @InjectClusterClient do not work correctly
> -
>
> Key: FLINK-28404
> URL: https://issues.apache.org/jira/browse/FLINK-28404
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Reporter: zl
>Priority: Major
>
> *test code:*
> {code:java}
> public class Test {
> @RegisterExtension
> private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
> new MiniClusterExtension(
> new MiniClusterResourceConfiguration.Builder()
> .setNumberTaskManagers(1)
> .setNumberSlotsPerTaskManager(4)
> .build());
> @org.junit.jupiter.api.Test
> void test(@InjectClusterClient RestClusterClient restClusterClient) 
> throws Exception {
> Object clusterId = restClusterClient.getClusterId();
> }
> } {code}
> *error info:*
> {code:java}
> org.junit.jupiter.api.extension.ParameterResolutionException: No 
> ParameterResolver registered for parameter 
> [org.apache.flink.client.program.rest.RestClusterClient arg0] in method... 
> {code}
> this problem occurs because 
> [MiniClusterExtension#supportsParameter|https://github.com/apache/flink/blob/master/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/junit5/MiniClusterExtension.java#L168]
>  does not support *_RestClusterClient_* parameterType. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28404) Annotation @InjectClusterClient do not work correctly

2022-07-05 Thread zl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zl updated FLINK-28404:
---
Description: 
*test code:*
{code:java}
public class Test {

@RegisterExtension
private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
new MiniClusterExtension(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(4)
.build());

@org.junit.jupiter.api.Test
void test(@InjectClusterClient RestClusterClient restClusterClient) 
throws Exception {
Object clusterId = restClusterClient.getClusterId();
}
} {code}
*error info:*
{code:java}
org.junit.jupiter.api.extension.ParameterResolutionException: No 
ParameterResolver registered for parameter 
[org.apache.flink.client.program.rest.RestClusterClient arg0] in method... 
{code}
 

this problem occurs because 
[MiniClusterExtension#supportsParameter|https://github.com/apache/flink/blob/master/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/junit5/MiniClusterExtension.java#L168]
 does not support *_RestClusterClient_* parameterType. 

  was:
*test code:*

 
{code:java}
public class Test {

@RegisterExtension
private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
new MiniClusterExtension(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(4)
.build());

@org.junit.jupiter.api.Test
void test(@InjectClusterClient RestClusterClient restClusterClient) 
throws Exception {
Object clusterId = restClusterClient.getClusterId();
}
} {code}
*error info:*

 

 
{code:java}
org.junit.jupiter.api.extension.ParameterResolutionException: No 
ParameterResolver registered for parameter 
[org.apache.flink.client.program.rest.RestClusterClient arg0] in method... 
{code}
this problem occurs because 
[MiniClusterExtension#supportsParameter|https://github.com/apache/flink/blob/master/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/junit5/MiniClusterExtension.java#L168]
 does not support *_RestClusterClient_* parameterType. 

 


> Annotation @InjectClusterClient do not work correctly
> -
>
> Key: FLINK-28404
> URL: https://issues.apache.org/jira/browse/FLINK-28404
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Reporter: zl
>Priority: Major
>
> *test code:*
> {code:java}
> public class Test {
> @RegisterExtension
> private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
> new MiniClusterExtension(
> new MiniClusterResourceConfiguration.Builder()
> .setNumberTaskManagers(1)
> .setNumberSlotsPerTaskManager(4)
> .build());
> @org.junit.jupiter.api.Test
> void test(@InjectClusterClient RestClusterClient restClusterClient) 
> throws Exception {
> Object clusterId = restClusterClient.getClusterId();
> }
> } {code}
> *error info:*
> {code:java}
> org.junit.jupiter.api.extension.ParameterResolutionException: No 
> ParameterResolver registered for parameter 
> [org.apache.flink.client.program.rest.RestClusterClient arg0] in method... 
> {code}
>  
> this problem occurs because 
> [MiniClusterExtension#supportsParameter|https://github.com/apache/flink/blob/master/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/junit5/MiniClusterExtension.java#L168]
>  does not support *_RestClusterClient_* parameterType. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28404) Annotation @InjectClusterClient do not work correctly

2022-07-05 Thread zl (Jira)
zl created FLINK-28404:
--

 Summary: Annotation @InjectClusterClient do not work correctly
 Key: FLINK-28404
 URL: https://issues.apache.org/jira/browse/FLINK-28404
 Project: Flink
  Issue Type: Bug
  Components: Tests
Reporter: zl


*test code:*

 
{code:java}
public class Test {

@RegisterExtension
private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
new MiniClusterExtension(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(4)
.build());

@org.junit.jupiter.api.Test
void test(@InjectClusterClient RestClusterClient restClusterClient) 
throws Exception {
Object clusterId = restClusterClient.getClusterId();
}
} {code}
*error info:*

 

 
{code:java}
org.junit.jupiter.api.extension.ParameterResolutionException: No 
ParameterResolver registered for parameter 
[org.apache.flink.client.program.rest.RestClusterClient arg0] in method... 
{code}
this problem occurs because 
[MiniClusterExtension#supportsParameter|https://github.com/apache/flink/blob/master/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/junit5/MiniClusterExtension.java#L168]
 does not support *_RestClusterClient_* parameterType. 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28274) ContinuousFileMonitoringFunction doesn't work with reactive mode

2022-07-05 Thread zl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28274?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17562433#comment-17562433
 ] 

zl commented on FLINK-28274:


`Without an incentive for users to migrate, they will never do so. `  cannot 
agree more :)

> ContinuousFileMonitoringFunction doesn't work with reactive mode
> 
>
> Key: FLINK-28274
> URL: https://issues.apache.org/jira/browse/FLINK-28274
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Runtime / Coordination
>Affects Versions: 1.16.0
>Reporter: Robert Metzger
>Assignee: zl
>Priority: Major
>  Labels: pull-request-available
>
> This issue was first reported in the Flink Slack: 
> https://apache-flink.slack.com/archives/C03G7LJTS2G/p1656257678477659
> It seems that reactive mode is changing the parallelism of the 
> `ContinuousFileMonitoringFunction`, which is supposed to always run with a 
> parallelism of 1.
> This is the error
> {code}
> INITIALIZING to FAILED with failure cause: 
> java.lang.IllegalArgumentException: ContinuousFileMonitoringFunction 
> retrieved invalid state.
>   at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
> {code}
> You can see from the logs that the parallelism is changing on a rescale event:
> {code}
> 2022-06-27 13:38:54,979 | INFO  | .executiongraph.ExecutionGraph  | Source: 
> Custom File Source (1/1) (cbaad20beee908b95c9fe5c34ba76bfa) switched from 
> RUNNING to CANCELING.
> 2022-06-27 13:38:55,254 | INFO  | .executiongraph.ExecutionGraph  | Source: 
> Custom File Source (1/1) (cbaad20beee908b95c9fe5c34ba76bfa) switched from 
> CANCELING to CANCELED.
> 2022-06-27 13:38:55,657 | INFO  | .executiongraph.ExecutionGraph  | Source: 
> Custom File Source (2/2) (6ceaacbe8d9aa507b0a56c850082da8c) switched from 
> DEPLOYING to INITIALIZING.
> 2022-06-27 13:38:55,722 | INFO  | .executiongraph.ExecutionGraph  | Source: 
> Custom File Source (2/2) (6ceaacbe8d9aa507b0a56c850082da8c) switched from 
> INITIALIZING to RUNNING.
> 2022-06-27 13:44:54,058 | INFO  | .executiongraph.ExecutionGraph  | Source: 
> Custom File Source (1/2) (665b12194741744d6bba4408a252fa45) switched from 
> RUNNING to CANCELING.
> 2022-06-27 13:45:00,825 | INFO  | .executiongraph.ExecutionGraph  | Source: 
> Custom File Source (1/2) (3cc408fd0eb9ddfa97b22f4dfc09d8dc) switched from 
> DEPLOYING to INITIALIZING.
> 2022-06-27 13:45:00,826 | INFO  | .executiongraph.ExecutionGraph  | Source: 
> Custom File Source (1/2) (3cc408fd0eb9ddfa97b22f4dfc09d8dc) switched from 
> INITIALIZING to RUNNING.
> 2022-06-27 13:45:01,434 | INFO  | .executiongraph.ExecutionGraph  | Source: 
> Custom File Source (2/2) (79338042d84b6458c34760bc85145512) switched from 
> DEPLOYING to INITIALIZING.
> 2022-06-27 13:45:02,427 | INFO  | .executiongraph.ExecutionGraph  | Source: 
> Custom File Source (2/2) (79338042d84b6458c34760bc85145512) switched from 
> INITIALIZING to RUNNING.
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28274) ContinuousFileMonitoringFunction doesn't work with reactive mode

2022-07-05 Thread zl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28274?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17562419#comment-17562419
 ] 

zl commented on FLINK-28274:


-- Does this problem only occur for \{ContinuousFileMonitoringFunction} or also 
for the new \{FileSource}

As [~wanglijie95] said, with FLIP-27, FileSource does not need 
{{ContinuousFileMonitoringFunction}}  to plays the role of SplitEnumerator, so 
this problem won't occur for new FileSource.

 

-- does it also occur with other operators whose parallelism must be 1 ?

>From what I understand, if the max parallelism is not set to 1, it's possible 
>that this problem may occur for these operators whose parallelism must be 1.

 

-- why fix it if it's only occurring in a legacy component ?

The legacy source are still widely used, for example, When reading files with 
datastream api, the first thought is to use readFile/readTextFile() methods, 
especially for starters. Since the fix work won't take much effort, may be it's 
worthy.

> ContinuousFileMonitoringFunction doesn't work with reactive mode
> 
>
> Key: FLINK-28274
> URL: https://issues.apache.org/jira/browse/FLINK-28274
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Runtime / Coordination
>Affects Versions: 1.16.0
>Reporter: Robert Metzger
>Assignee: zl
>Priority: Major
>  Labels: pull-request-available
>
> This issue was first reported in the Flink Slack: 
> https://apache-flink.slack.com/archives/C03G7LJTS2G/p1656257678477659
> It seems that reactive mode is changing the parallelism of the 
> `ContinuousFileMonitoringFunction`, which is supposed to always run with a 
> parallelism of 1.
> This is the error
> {code}
> INITIALIZING to FAILED with failure cause: 
> java.lang.IllegalArgumentException: ContinuousFileMonitoringFunction 
> retrieved invalid state.
>   at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
> {code}
> You can see from the logs that the parallelism is changing on a rescale event:
> {code}
> 2022-06-27 13:38:54,979 | INFO  | .executiongraph.ExecutionGraph  | Source: 
> Custom File Source (1/1) (cbaad20beee908b95c9fe5c34ba76bfa) switched from 
> RUNNING to CANCELING.
> 2022-06-27 13:38:55,254 | INFO  | .executiongraph.ExecutionGraph  | Source: 
> Custom File Source (1/1) (cbaad20beee908b95c9fe5c34ba76bfa) switched from 
> CANCELING to CANCELED.
> 2022-06-27 13:38:55,657 | INFO  | .executiongraph.ExecutionGraph  | Source: 
> Custom File Source (2/2) (6ceaacbe8d9aa507b0a56c850082da8c) switched from 
> DEPLOYING to INITIALIZING.
> 2022-06-27 13:38:55,722 | INFO  | .executiongraph.ExecutionGraph  | Source: 
> Custom File Source (2/2) (6ceaacbe8d9aa507b0a56c850082da8c) switched from 
> INITIALIZING to RUNNING.
> 2022-06-27 13:44:54,058 | INFO  | .executiongraph.ExecutionGraph  | Source: 
> Custom File Source (1/2) (665b12194741744d6bba4408a252fa45) switched from 
> RUNNING to CANCELING.
> 2022-06-27 13:45:00,825 | INFO  | .executiongraph.ExecutionGraph  | Source: 
> Custom File Source (1/2) (3cc408fd0eb9ddfa97b22f4dfc09d8dc) switched from 
> DEPLOYING to INITIALIZING.
> 2022-06-27 13:45:00,826 | INFO  | .executiongraph.ExecutionGraph  | Source: 
> Custom File Source (1/2) (3cc408fd0eb9ddfa97b22f4dfc09d8dc) switched from 
> INITIALIZING to RUNNING.
> 2022-06-27 13:45:01,434 | INFO  | .executiongraph.ExecutionGraph  | Source: 
> Custom File Source (2/2) (79338042d84b6458c34760bc85145512) switched from 
> DEPLOYING to INITIALIZING.
> 2022-06-27 13:45:02,427 | INFO  | .executiongraph.ExecutionGraph  | Source: 
> Custom File Source (2/2) (79338042d84b6458c34760bc85145512) switched from 
> INITIALIZING to RUNNING.
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28274) ContinuousFileMonitoringFunction doesn't work with reactive mode

2022-06-30 Thread zl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28274?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17561061#comment-17561061
 ] 

zl commented on FLINK-28274:


HI [~rmetzger] , could you assign this ticket to me before I open a PR ?

> ContinuousFileMonitoringFunction doesn't work with reactive mode
> 
>
> Key: FLINK-28274
> URL: https://issues.apache.org/jira/browse/FLINK-28274
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Runtime / Coordination
>Affects Versions: 1.16.0
>Reporter: Robert Metzger
>Priority: Major
>
> This issue was first reported in the Flink Slack: 
> https://apache-flink.slack.com/archives/C03G7LJTS2G/p1656257678477659
> It seems that reactive mode is changing the parallelism of the 
> `ContinuousFileMonitoringFunction`, which is supposed to always run with a 
> parallelism of 1.
> This is the error
> {code}
> INITIALIZING to FAILED with failure cause: 
> java.lang.IllegalArgumentException: ContinuousFileMonitoringFunction 
> retrieved invalid state.
>   at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
> {code}
> You can see from the logs that the parallelism is changing on a rescale event:
> {code}
> 2022-06-27 13:38:54,979 | INFO  | .executiongraph.ExecutionGraph  | Source: 
> Custom File Source (1/1) (cbaad20beee908b95c9fe5c34ba76bfa) switched from 
> RUNNING to CANCELING.
> 2022-06-27 13:38:55,254 | INFO  | .executiongraph.ExecutionGraph  | Source: 
> Custom File Source (1/1) (cbaad20beee908b95c9fe5c34ba76bfa) switched from 
> CANCELING to CANCELED.
> 2022-06-27 13:38:55,657 | INFO  | .executiongraph.ExecutionGraph  | Source: 
> Custom File Source (2/2) (6ceaacbe8d9aa507b0a56c850082da8c) switched from 
> DEPLOYING to INITIALIZING.
> 2022-06-27 13:38:55,722 | INFO  | .executiongraph.ExecutionGraph  | Source: 
> Custom File Source (2/2) (6ceaacbe8d9aa507b0a56c850082da8c) switched from 
> INITIALIZING to RUNNING.
> 2022-06-27 13:44:54,058 | INFO  | .executiongraph.ExecutionGraph  | Source: 
> Custom File Source (1/2) (665b12194741744d6bba4408a252fa45) switched from 
> RUNNING to CANCELING.
> 2022-06-27 13:45:00,825 | INFO  | .executiongraph.ExecutionGraph  | Source: 
> Custom File Source (1/2) (3cc408fd0eb9ddfa97b22f4dfc09d8dc) switched from 
> DEPLOYING to INITIALIZING.
> 2022-06-27 13:45:00,826 | INFO  | .executiongraph.ExecutionGraph  | Source: 
> Custom File Source (1/2) (3cc408fd0eb9ddfa97b22f4dfc09d8dc) switched from 
> INITIALIZING to RUNNING.
> 2022-06-27 13:45:01,434 | INFO  | .executiongraph.ExecutionGraph  | Source: 
> Custom File Source (2/2) (79338042d84b6458c34760bc85145512) switched from 
> DEPLOYING to INITIALIZING.
> 2022-06-27 13:45:02,427 | INFO  | .executiongraph.ExecutionGraph  | Source: 
> Custom File Source (2/2) (79338042d84b6458c34760bc85145512) switched from 
> INITIALIZING to RUNNING.
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28274) ContinuousFileMonitoringFunction doesn't work with reactive mode

2022-06-30 Thread zl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28274?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17561034#comment-17561034
 ] 

zl commented on FLINK-28274:


---The fix is either ensuring that maxParallelism for the monitoring function 
is set correctly, or somehow telling the adaptive scheduler to not change the 
parallelism of that function (in a generic way)

I prefer to set the max parallelism for the monitoring function correctly. 
Since ContinuousFileMonitoringFunction is a legacy source function, we'd better 
not change the adaptive scheduler for  this issue.

[AdaptiveScheduler.java#L344|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java#L344]
 will calculate the maxParallelism for vertex only if no max parallelism was 
configured, so if we can ensure that maxParallelism for the monitoring function 
is 1, then reactive mode would not change the parallelism of the 
`ContinuousFileMonitoringFunction`.

Since `ContinuousFileMonitoringFunction` is only used in 
[StreamExecutionEnvironment.java#L1855|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#L1855]
 , It's safe to explicitly set the max parallelism here.

> ContinuousFileMonitoringFunction doesn't work with reactive mode
> 
>
> Key: FLINK-28274
> URL: https://issues.apache.org/jira/browse/FLINK-28274
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Runtime / Coordination
>Affects Versions: 1.16.0
>Reporter: Robert Metzger
>Priority: Major
>
> This issue was first reported in the Flink Slack: 
> https://apache-flink.slack.com/archives/C03G7LJTS2G/p1656257678477659
> It seems that reactive mode is changing the parallelism of the 
> `ContinuousFileMonitoringFunction`, which is supposed to always run with a 
> parallelism of 1.
> This is the error
> {code}
> INITIALIZING to FAILED with failure cause: 
> java.lang.IllegalArgumentException: ContinuousFileMonitoringFunction 
> retrieved invalid state.
>   at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
> {code}
> You can see from the logs that the parallelism is changing on a rescale event:
> {code}
> 2022-06-27 13:38:54,979 | INFO  | .executiongraph.ExecutionGraph  | Source: 
> Custom File Source (1/1) (cbaad20beee908b95c9fe5c34ba76bfa) switched from 
> RUNNING to CANCELING.
> 2022-06-27 13:38:55,254 | INFO  | .executiongraph.ExecutionGraph  | Source: 
> Custom File Source (1/1) (cbaad20beee908b95c9fe5c34ba76bfa) switched from 
> CANCELING to CANCELED.
> 2022-06-27 13:38:55,657 | INFO  | .executiongraph.ExecutionGraph  | Source: 
> Custom File Source (2/2) (6ceaacbe8d9aa507b0a56c850082da8c) switched from 
> DEPLOYING to INITIALIZING.
> 2022-06-27 13:38:55,722 | INFO  | .executiongraph.ExecutionGraph  | Source: 
> Custom File Source (2/2) (6ceaacbe8d9aa507b0a56c850082da8c) switched from 
> INITIALIZING to RUNNING.
> 2022-06-27 13:44:54,058 | INFO  | .executiongraph.ExecutionGraph  | Source: 
> Custom File Source (1/2) (665b12194741744d6bba4408a252fa45) switched from 
> RUNNING to CANCELING.
> 2022-06-27 13:45:00,825 | INFO  | .executiongraph.ExecutionGraph  | Source: 
> Custom File Source (1/2) (3cc408fd0eb9ddfa97b22f4dfc09d8dc) switched from 
> DEPLOYING to INITIALIZING.
> 2022-06-27 13:45:00,826 | INFO  | .executiongraph.ExecutionGraph  | Source: 
> Custom File Source (1/2) (3cc408fd0eb9ddfa97b22f4dfc09d8dc) switched from 
> INITIALIZING to RUNNING.
> 2022-06-27 13:45:01,434 | INFO  | .executiongraph.ExecutionGraph  | Source: 
> Custom File Source (2/2) (79338042d84b6458c34760bc85145512) switched from 
> DEPLOYING to INITIALIZING.
> 2022-06-27 13:45:02,427 | INFO  | .executiongraph.ExecutionGraph  | Source: 
> Custom File Source (2/2) (79338042d84b6458c34760bc85145512) switched from 
> INITIALIZING to RUNNING.
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28324) JUnit5 Migration] Module: flink-sql-client

2022-06-30 Thread zl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28324?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zl updated FLINK-28324:
---
Parent: FLINK-25325
Issue Type: Sub-task  (was: Improvement)

> JUnit5 Migration] Module: flink-sql-client
> --
>
> Key: FLINK-28324
> URL: https://issues.apache.org/jira/browse/FLINK-28324
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Client
>Reporter: zl
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28324) JUnit5 Migration] Module: flink-sql-client

2022-06-30 Thread zl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17560953#comment-17560953
 ] 

zl commented on FLINK-28324:


Hi [~zhuzh] , could you assign this ticket to me ?

> JUnit5 Migration] Module: flink-sql-client
> --
>
> Key: FLINK-28324
> URL: https://issues.apache.org/jira/browse/FLINK-28324
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Client
>Reporter: zl
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28324) JUnit5 Migration] Module: flink-sql-client

2022-06-30 Thread zl (Jira)
zl created FLINK-28324:
--

 Summary: JUnit5 Migration] Module: flink-sql-client
 Key: FLINK-28324
 URL: https://issues.apache.org/jira/browse/FLINK-28324
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Client
Reporter: zl






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28262) The select result does not match the sink table

2022-06-27 Thread zl (Jira)
Title: Message Title


 
 
 
 

 
 
 

 
   
 zl commented on  FLINK-28262  
 

  
 
 
 
 

 
 
  
 
 
 
 

 
  Re: The select result does not match the sink table   
 

  
 
 
 
 

 
 From what I understand, `int_two as int_two0` in Select clause has nothing to do with field mapping. If you want specify the mapping by name instead of location, you can do it like that: `insert into sink (...) select ...`  
 

  
 
 
 
 

 
 
 

 
 
 Add Comment  
 

  
 

  
 
 
 
  
 

  
 
 
 
 

 
 This message was sent by Atlassian Jira (v8.20.10#820010-sha1:ace47f9)  
 
 

 
   
 

  
 

  
 

   



[jira] [Commented] (FLINK-28105) We should test the copied object in GlobFilePathFilterTest#testGlobFilterSerializable

2022-06-17 Thread zl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17555457#comment-17555457
 ] 

zl commented on FLINK-28105:


Hi [~zhuzh] ,can you take a look ?

> We should test the copied object in 
> GlobFilePathFilterTest#testGlobFilterSerializable
> -
>
> Key: FLINK-28105
> URL: https://issues.apache.org/jira/browse/FLINK-28105
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Reporter: zl
>Priority: Minor
>
> Variable 
> [matcherCopy|https://github.com/apache/flink/blob/master/flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java#L170]
>  is created without testing.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-28105) We should test the copied object in GlobFilePathFilterTest#testGlobFilterSerializable

2022-06-17 Thread zl (Jira)
zl created FLINK-28105:
--

 Summary: We should test the copied object in 
GlobFilePathFilterTest#testGlobFilterSerializable
 Key: FLINK-28105
 URL: https://issues.apache.org/jira/browse/FLINK-28105
 Project: Flink
  Issue Type: Bug
  Components: API / Core
Reporter: zl


Variable 
[matcherCopy|https://github.com/apache/flink/blob/master/flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java#L170]
 is created without testing.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-28018) the start index to create empty splits in BinaryInputFormat#createInputSplits is inappropriate

2022-06-13 Thread zl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17553481#comment-17553481
 ] 

zl commented on FLINK-28018:


Thanks for confirming this [~zhuzh] , I'd like to fix it.

> the start index to create empty splits in BinaryInputFormat#createInputSplits 
> is inappropriate
> --
>
> Key: FLINK-28018
> URL: https://issues.apache.org/jira/browse/FLINK-28018
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.15.0, 1.14.4, 1.16.0
>Reporter: zl
>Priority: Major
> Fix For: 1.16.0, 1.15.2
>
>
> when the number of created split is smaller than the minimum desired number 
> of file splits, 
> [BinaryInputFormat.java#L150|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java#L150]
>  use `{_}*files.size()*{_}` as the start index to create empty splits. That 
> is inappropriate, the start index should be `{_}*inputSplits.size()*{_}`.  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-28018) the start index to create empty splits in BinaryInputFormat#createInputSplits is inappropriate

2022-06-13 Thread zl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17553449#comment-17553449
 ] 

zl commented on FLINK-28018:


Hi [~zhuzh] , can you take a look ?

> the start index to create empty splits in BinaryInputFormat#createInputSplits 
> is inappropriate
> --
>
> Key: FLINK-28018
> URL: https://issues.apache.org/jira/browse/FLINK-28018
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Reporter: zl
>Priority: Major
>
> when the number of created split is smaller than the minimum desired number 
> of file splits, 
> [BinaryInputFormat.java#L150|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java#L150]
>  use `{_}*files.size()*{_}` as the start index to create empty splits. That 
> is inappropriate, the start index should be `{_}*inputSplits.size()*{_}`.  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-28018) the start index to create empty splits in BinaryInputFormat#createInputSplits is inappropriate

2022-06-13 Thread zl (Jira)
zl created FLINK-28018:
--

 Summary: the start index to create empty splits in 
BinaryInputFormat#createInputSplits is inappropriate
 Key: FLINK-28018
 URL: https://issues.apache.org/jira/browse/FLINK-28018
 Project: Flink
  Issue Type: Bug
  Components: API / Core
Reporter: zl


when the number of created split is smaller than the minimum desired number of 
file splits, 
[BinaryInputFormat.java#L150|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java#L150]
 use `{_}*files.size()*{_}` as the start index to create empty splits. That is 
inappropriate, the start index should be `{_}*inputSplits.size()*{_}`.  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-26883) Bump dependency-check-maven to 2.10.1

2022-03-28 Thread zl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17513353#comment-17513353
 ] 

zl commented on FLINK-26883:


cc [~chesnay] , wdyt ?

> Bump dependency-check-maven to 2.10.1
> -
>
> Key: FLINK-26883
> URL: https://issues.apache.org/jira/browse/FLINK-26883
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: zl
>Priority: Major
>
> when running *_mvn org.owasp:dependency-check-maven:aggregate ,_* the 
> following error occurred:
>  
> {code:java}
> IO Exception connecting to 
> https://nvd.nist.gov/feeds/json/cve/1.0/nvdcve-1.0-2019.json.gz: HEAD request 
> returned a non-200 status code: 
> https://nvd.nist.gov/feeds/json/cve/1.0/nvdcve-1.0-2019.json.gz 
> .. {code}
>  
> That's because org.owasp:dependency-check-maven:5.0.0-M2 in 
> _*flink-parent/pom.xml*_ is outdated and the data is unavailable. we may need 
> to bump dependency-check-maven to newer version, like 7.0.1.
> I rerun *_mvn org.owasp:dependency-check-maven:aggregate_* with 
> org.owasp:dependency-check-maven:7.0.1, it works well.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26883) Bump dependency-check-maven to 2.10.1

2022-03-28 Thread zl (Jira)
zl created FLINK-26883:
--

 Summary: Bump dependency-check-maven to 2.10.1
 Key: FLINK-26883
 URL: https://issues.apache.org/jira/browse/FLINK-26883
 Project: Flink
  Issue Type: Improvement
  Components: Build System
Reporter: zl


when running *_mvn org.owasp:dependency-check-maven:aggregate ,_* the following 
error occurred:

 
{code:java}
IO Exception connecting to 
https://nvd.nist.gov/feeds/json/cve/1.0/nvdcve-1.0-2019.json.gz: HEAD request 
returned a non-200 status code: 
https://nvd.nist.gov/feeds/json/cve/1.0/nvdcve-1.0-2019.json.gz 
.. {code}
 

That's because org.owasp:dependency-check-maven:5.0.0-M2 in 
_*flink-parent/pom.xml*_ is outdated and the data is unavailable. we may need 
to bump dependency-check-maven to newer version, like 7.0.1.

I rerun *_mvn org.owasp:dependency-check-maven:aggregate_* with 
org.owasp:dependency-check-maven:7.0.1, it works well.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (FLINK-26692) migrate TpcdsTestProgram.java to new source

2022-03-18 Thread zl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17508639#comment-17508639
 ] 

zl edited comment on FLINK-26692 at 3/18/22, 8:50 AM:
--

Hi [~zhuzh] , thanks for assigning this ticket to me.  -when working on this 
issue, I met another problem FLINK-26722, It makes the validation of tpcds 
results failed. Maybe we should fix FLINK-26722 first.-
 

with option *_csv.null-literal = "",_* we can solve this problem, thanks 
[~wanglijie95] 


was (Author: leo zhou):
-Hi- [~zhuzh] -, thanks for assigning this ticket to me.  when working on this 
issue, I met another problem FLINK-26722, It makes the validation of tpcds 
results failed. Maybe we should fix FLINK-26722 first.-
 

with option *_csv.null-literal = "",_* we can solve this problem, thanks 
[~wanglijie95] 

> migrate TpcdsTestProgram.java to new source
> ---
>
> Key: FLINK-26692
> URL: https://issues.apache.org/jira/browse/FLINK-26692
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.15.0
>Reporter: zl
>Assignee: zl
>Priority: Major
> Fix For: 1.16.0
>
>
> [run-nightly-tests.sh#L220|https://github.com/apache/flink/blob/master/flink-end-to-end-tests/run-nightly-tests.sh#L220]
>  run TpcdsTestProgram which uses the legacy source with 
> AdaptiveBatchScheduler, since there are some known issues (FLINK-26576 , 
> FLINK-26548 )about legacy source, I think we should migrate TpcdsTestProgram 
> to new source asap.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (FLINK-26692) migrate TpcdsTestProgram.java to new source

2022-03-18 Thread zl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17508639#comment-17508639
 ] 

zl edited comment on FLINK-26692 at 3/18/22, 8:48 AM:
--

-Hi- [~zhuzh] -, thanks for assigning this ticket to me.  when working on this 
issue, I met another problem FLINK-26722, It makes the validation of tpcds 
results failed. Maybe we should fix FLINK-26722 first.-
 

with option *_csv.null-literal = "",_* we can solve this problem, thanks 
[~wanglijie95] 


was (Author: leo zhou):
Hi [~zhuzh] , thanks for assigning this ticket to me.  when working on this 
issue, I met another problem FLINK-26722, It makes the validation of tpcds 
results failed. Maybe we should fix [FLINK-26722] first.
 

> migrate TpcdsTestProgram.java to new source
> ---
>
> Key: FLINK-26692
> URL: https://issues.apache.org/jira/browse/FLINK-26692
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.15.0
>Reporter: zl
>Assignee: zl
>Priority: Major
> Fix For: 1.16.0
>
>
> [run-nightly-tests.sh#L220|https://github.com/apache/flink/blob/master/flink-end-to-end-tests/run-nightly-tests.sh#L220]
>  run TpcdsTestProgram which uses the legacy source with 
> AdaptiveBatchScheduler, since there are some known issues (FLINK-26576 , 
> FLINK-26548 )about legacy source, I think we should migrate TpcdsTestProgram 
> to new source asap.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (FLINK-26722) the result is wrong when using file connector with csv format

2022-03-18 Thread zl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17508641#comment-17508641
 ] 

zl edited comment on FLINK-26722 at 3/18/22, 8:24 AM:
--

I think it has something to do with field parsing.
 
when use CsvTableSource, we use 
[RowCsvInputFormat|https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java]
 for reading data and 
[StringParser#parseField|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java#L47]
 for parsing string field. when a string field is empty(""), if 
_*emptyColumnAsNull*_ is enabled, 
[RowCsvInputFormat#L221|https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java#L221]will
 set the field value to null.

when use the new file connector with csv format, we use 
[CsvReaderFormat|https://github.com/apache/flink/blob/master/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvReaderFormat.java]
 for reading data and 
[CsvToRowDataConverters#convertToString|https://github.com/apache/flink/blob/master/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvToRowDataConverters.java#L271]
 for parsing string field. when a string field is empty(""), 
[CsvToRowDataConverters.java#L109|https://github.com/apache/flink/blob/master/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvToRowDataConverters.java#L109]will
 set the field value to empty string ("").
 
I think the way that CsvTableSource treats empty string may be more reasonable, 
the new file source with csv format should be consistent with it. It means that 
when *_csv.ignore-parse-errors_* is enabled, 
[CsvToRowDataConverters#convertToString|https://github.com/apache/flink/blob/master/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvToRowDataConverters.java#L271]
 covert empty string ("") to null, otherwise convert  ("") to  ("").


was (Author: leo zhou):
I think it has something to do with field parsing.
 
when use CsvTableSource, we use 
[RowCsvInputFormat|https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java]
 for reading data and 
[StringParser#parseField|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java#L47]
 for parsing string field. when a string field is empty(""), if 
emptyColumnAsNull is enabled, 
[RowCsvInputFormat#L221|https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java#L221]will
 set the field value to null.

when use the new file connector with csv format, we use 
[CsvReaderFormat|https://github.com/apache/flink/blob/master/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvReaderFormat.java]
 for reading data and 
[CsvToRowDataConverters#convertToString|https://github.com/apache/flink/blob/master/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvToRowDataConverters.java#L271]
 for parsing string field. when a string field is empty(""), 
[CsvToRowDataConverters.java#L109|https://github.com/apache/flink/blob/master/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvToRowDataConverters.java#L109]will
 set the field value to empty string ("").
 
I think the way that CsvTableSource treats empty string may be more reasonable, 
the new file source with csv format should be consistent with it. It means that 
when *_csv.ignore-parse-errors_* is enabled, 
[CsvToRowDataConverters#convertToString|https://github.com/apache/flink/blob/master/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvToRowDataConverters.java#L271]
 covert empty string ("") to null, otherwise convert  ("") to  ("").

> the result is wrong when using file connector with csv format
> -
>
> Key: FLINK-26722
> URL: https://issues.apache.org/jira/browse/FLINK-26722
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: zl
>Priority: Major
> Attachments: CsvTest1.java, example.csv, 
> image-2022-03-18-15-32-28-914.png
>
>
> CsvTest1.java execute a same query on a same dataset (Attachment example.csv) 
> with CsvTableSource and the new file connector respectively,  but the result 
> is different. The results are as follows:
> !image-2022-03-18-15-32-28-914.png!



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (FLINK-26722) the result is wrong when using file connector with csv format

2022-03-18 Thread zl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17508641#comment-17508641
 ] 

zl edited comment on FLINK-26722 at 3/18/22, 8:20 AM:
--

I think it has something to do with field parsing.
 
when use CsvTableSource, we use 
[RowCsvInputFormat|https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java]
 for reading data and 
[StringParser#parseField|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java#L47]
 for parsing string field. when a string field is empty(""), if 
emptyColumnAsNull is enabled, 
[RowCsvInputFormat#L221|https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java#L221]will
 set the field value to null.

when use the new file connector with csv format, we use 
[CsvReaderFormat|https://github.com/apache/flink/blob/master/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvReaderFormat.java]
 for reading data and 
[CsvToRowDataConverters#convertToString|https://github.com/apache/flink/blob/master/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvToRowDataConverters.java#L271]
 for parsing string field. when a string field is empty(""), 
[CsvToRowDataConverters.java#L109|https://github.com/apache/flink/blob/master/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvToRowDataConverters.java#L109]will
 set the field value to empty string ("").
 
I think the way that CsvTableSource treats empty string may be more reasonable, 
the new file source with csv format should be consistent with it. It means that 
when *_csv.ignore-parse-errors_* is enabled, 
[CsvToRowDataConverters#convertToString|https://github.com/apache/flink/blob/master/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvToRowDataConverters.java#L271]
 covert empty string ("") to null, otherwise convert  ("") to  ("").


was (Author: leo zhou):
I think it has something to do with field parsing.
 
when use CsvTableSource, we use 
[RowCsvInputFormat|https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java]
 for reading data and 
[StringParser#parseField|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java#L47]
 for parsing string field. when a string field is empty(""), if 
emptyColumnAsNull is enabled, 
[RowCsvInputFormat#L221|https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java#L221]will
 set the field value to null.

when use the new file connector with csv format, we use 
[CsvReaderFormat|https://github.com/apache/flink/blob/master/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvReaderFormat.java]
 for reading data and 
[CsvToRowDataConverters#convertToString|https://github.com/apache/flink/blob/master/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvToRowDataConverters.java#L271]
 for parsing string field. when a string field is empty(""), 
[CsvToRowDataConverters.java#L109|https://github.com/apache/flink/blob/master/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvToRowDataConverters.java#L109]will
 set the field value to empty string ("").
 
I think the way that CsvTableSource treats empty string may be more reasonable, 
the new file source with csv format should be consistent with it

> the result is wrong when using file connector with csv format
> -
>
> Key: FLINK-26722
> URL: https://issues.apache.org/jira/browse/FLINK-26722
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: zl
>Priority: Major
> Attachments: CsvTest1.java, example.csv, 
> image-2022-03-18-15-32-28-914.png
>
>
> CsvTest1.java execute a same query on a same dataset (Attachment example.csv) 
> with CsvTableSource and the new file connector respectively,  but the result 
> is different. The results are as follows:
> !image-2022-03-18-15-32-28-914.png!



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26722) the result is wrong when using file connector with csv format

2022-03-18 Thread zl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17508641#comment-17508641
 ] 

zl commented on FLINK-26722:


I think it has something to do with field parsing.
 
when use CsvTableSource, we use 
[RowCsvInputFormat|https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java]
 for reading data and 
[StringParser#parseField|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java#L47]
 for parsing string field. when a string field is empty(""), if 
emptyColumnAsNull is enabled, 
[RowCsvInputFormat#L221|https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java#L221]will
 set the field value to null.

when use the new file connector with csv format, we use 
[CsvReaderFormat|https://github.com/apache/flink/blob/master/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvReaderFormat.java]
 for reading data and 
[CsvToRowDataConverters#convertToString|https://github.com/apache/flink/blob/master/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvToRowDataConverters.java#L271]
 for parsing string field. when a string field is empty(""), 
[CsvToRowDataConverters.java#L109|https://github.com/apache/flink/blob/master/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvToRowDataConverters.java#L109]will
 set the field value to empty string ("").
 
I think the way that CsvTableSource treats empty string may be more reasonable, 
the new file source with csv format should be consistent with it

> the result is wrong when using file connector with csv format
> -
>
> Key: FLINK-26722
> URL: https://issues.apache.org/jira/browse/FLINK-26722
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: zl
>Priority: Major
> Attachments: CsvTest1.java, example.csv, 
> image-2022-03-18-15-32-28-914.png
>
>
> CsvTest1.java execute a same query on a same dataset (Attachment example.csv) 
> with CsvTableSource and the new file connector respectively,  but the result 
> is different. The results are as follows:
> !image-2022-03-18-15-32-28-914.png!



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26692) migrate TpcdsTestProgram.java to new source

2022-03-18 Thread zl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17508639#comment-17508639
 ] 

zl commented on FLINK-26692:


Hi [~zhuzh] , thanks for assigning this ticket to me.  when working on this 
issue, I met another problem FLINK-26722, It makes the validation of tpcds 
results failed. Maybe we should fix [FLINK-26722] first.
 

> migrate TpcdsTestProgram.java to new source
> ---
>
> Key: FLINK-26692
> URL: https://issues.apache.org/jira/browse/FLINK-26692
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.15.0
>Reporter: zl
>Assignee: zl
>Priority: Major
> Fix For: 1.16.0
>
>
> [run-nightly-tests.sh#L220|https://github.com/apache/flink/blob/master/flink-end-to-end-tests/run-nightly-tests.sh#L220]
>  run TpcdsTestProgram which uses the legacy source with 
> AdaptiveBatchScheduler, since there are some known issues (FLINK-26576 , 
> FLINK-26548 )about legacy source, I think we should migrate TpcdsTestProgram 
> to new source asap.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26722) the result is wrong when using file connector with csv format

2022-03-18 Thread zl (Jira)
zl created FLINK-26722:
--

 Summary: the result is wrong when using file connector with csv 
format
 Key: FLINK-26722
 URL: https://issues.apache.org/jira/browse/FLINK-26722
 Project: Flink
  Issue Type: Bug
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Reporter: zl
 Attachments: CsvTest1.java, example.csv, 
image-2022-03-18-15-32-28-914.png

CsvTest1.java execute a same query on a same dataset (Attachment example.csv) 
with CsvTableSource and the new file connector respectively,  but the result is 
different. The results are as follows:

!image-2022-03-18-15-32-28-914.png!



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26692) migrate TpcdsTestProgram.java to new source

2022-03-16 Thread zl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17507938#comment-17507938
 ] 

zl commented on FLINK-26692:


Hi [~wanglijie95] ,can you assign this ticket to me ?

> migrate TpcdsTestProgram.java to new source
> ---
>
> Key: FLINK-26692
> URL: https://issues.apache.org/jira/browse/FLINK-26692
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Reporter: zl
>Priority: Major
>
> [run-nightly-tests.sh#L220|https://github.com/apache/flink/blob/master/flink-end-to-end-tests/run-nightly-tests.sh#L220]
>  run TpcdsTestProgram which uses the legacy source with 
> AdaptiveBatchScheduler, since there are some known issues (FLINK-26576 , 
> FLINK-26548 )about legacy source, I think we should migrate TpcdsTestProgram 
> to new source asap.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26692) migrate TpcdsTestProgram.java to new source

2022-03-16 Thread zl (Jira)
zl created FLINK-26692:
--

 Summary: migrate TpcdsTestProgram.java to new source
 Key: FLINK-26692
 URL: https://issues.apache.org/jira/browse/FLINK-26692
 Project: Flink
  Issue Type: Improvement
  Components: Tests
Reporter: zl


[run-nightly-tests.sh#L220|https://github.com/apache/flink/blob/master/flink-end-to-end-tests/run-nightly-tests.sh#L220]
 run TpcdsTestProgram which uses the legacy source with AdaptiveBatchScheduler, 
since there are some known issues (FLINK-26576 , FLINK-26548 )about legacy 
source, I think we should migrate TpcdsTestProgram to new source asap.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26548) the source parallelism is not set correctly with AdaptiveBatchScheduler

2022-03-10 Thread zl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17504241#comment-17504241
 ] 

zl commented on FLINK-26548:


Hi [~zhuzh] , thanks for assiging this ticket to me. I have fixed this issue 
via [PR-19040|https://github.com/apache/flink/pull/19040].

> the source parallelism is not set correctly with AdaptiveBatchScheduler
> ---
>
> Key: FLINK-26548
> URL: https://issues.apache.org/jira/browse/FLINK-26548
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.15.0
>Reporter: zl
>Assignee: zl
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.15.0
>
> Attachments: image-2022-03-09-19-00-18-396.png
>
>
> When running *_org.apache.flink.table.tpcds.TpcdsTestProgram_* with 
> {_}*AdaptiveBatchScheduler*{_}, I ran into a problem:the num of records sent 
> by the source operator is always 1, and the parallelism of source operator is 
> also 1 even I set 
> *_jobmanager.adaptive-batch-scheduler.default-source-parallelism_* to 8.
> !image-2022-03-09-19-00-18-396.png!
> After some research, I found that the operator A is not the actual file 
> reader, it just splits files and assigns splits to downstream tasks for 
> further processing, and the operator B is the actual file reader task. Here, 
> the parallelism of operator B is 64, and the records sent by operator A is 1, 
> this means, operator A assigned all splits to a task of operator B, {*}_the 
> other 63 tasks of operator B is idle_{*}, it is unreasonable.
> In this case,  the parallelism of operator B should be 
> *_jobmanager.adaptive-batch-scheduler.default-source-parallelism_*  and the 
> num of records sent by operator A also should be 
> {*}_jobmanager.adaptive-batch-scheduler.default-source-parallelism_{*}.
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (FLINK-26548) the source parallelism is not set correctly with AdaptiveBatchScheduler

2022-03-10 Thread zl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17504182#comment-17504182
 ] 

zl edited comment on FLINK-26548 at 3/10/22, 10:57 AM:
---

Hi [~wanglijie95],  thanks for replying.

 

-> this problem only occurs when using legacy file sources, other sources and 
the new source(FLIP-27) will not have two operators.

yes,  this problem only occurs when using legacy file sources

 

We can solve this problem in this way: before create 
*_{{ContinuousFileMonitoringFunction}}_ ,* we get the value of 
*_default.parallelism_* , if the value is -1 and AdaptiveBatchScheduler is 
enabled, we pass the value of 
*_jobmanager.adaptive-batch-scheduler.default-source-parallelism_* {_}to 
*{{ContinuousFileMonitoringFunction.}}*{_}{{{}Then we need to set the 
parallelism of real source reader to  
*_jobmanager.adaptive-batch-scheduler.default-source-parallelism._*{}}}

btw, can you assign this issue to me?


was (Author: leo zhou):
Hi [~wanglijie95],  thanks for replying.

 

-> this problem only occurs when using legacy file sources, other sources and 
the new source(FLIP-27) will not have two operators.

yes,  this problem only occurs when using legacy file sources

 

We can solve this problem in this way: before create 
*_{{ContinuousFileMonitoringFunction}}_ ,* we get the value of 
*_default.parallelism_* , if the value is -1 and AdaptiveBatchScheduler is 
enabled, we pass the value of 
*_jobmanager.adaptive-batch-scheduler.default-source-parallelism_* {_}to 
*{{ContinuousFileMonitoringFunction.}}*{_}{{{}Then we need to set the 
parallelism of real source reader to  
*_jobmanager.adaptive-batch-scheduler.default-source-parallelism._*{}}}

> the source parallelism is not set correctly with AdaptiveBatchScheduler
> ---
>
> Key: FLINK-26548
> URL: https://issues.apache.org/jira/browse/FLINK-26548
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.15.0
>Reporter: zl
>Priority: Blocker
> Fix For: 1.15.0
>
> Attachments: image-2022-03-09-19-00-18-396.png
>
>
> When running *_org.apache.flink.table.tpcds.TpcdsTestProgram_* with 
> {_}*AdaptiveBatchScheduler*{_}, I ran into a problem:the num of records sent 
> by the source operator is always 1, and the parallelism of source operator is 
> also 1 even I set 
> *_jobmanager.adaptive-batch-scheduler.default-source-parallelism_* to 8.
> !image-2022-03-09-19-00-18-396.png!
> After some research, I found that the operator A is not the actual file 
> reader, it just splits files and assigns splits to downstream tasks for 
> further processing, and the operator B is the actual file reader task. Here, 
> the parallelism of operator B is 64, and the records sent by operator A is 1, 
> this means, operator A assigned all splits to a task of operator B, {*}_the 
> other 63 tasks of operator B is idle_{*}, it is unreasonable.
> In this case,  the parallelism of operator B should be 
> *_jobmanager.adaptive-batch-scheduler.default-source-parallelism_*  and the 
> num of records sent by operator A also should be 
> {*}_jobmanager.adaptive-batch-scheduler.default-source-parallelism_{*}.
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26548) the source parallelism is not set correctly with AdaptiveBatchScheduler

2022-03-10 Thread zl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17504182#comment-17504182
 ] 

zl commented on FLINK-26548:


Hi [~wanglijie95],  thanks for replying.

 

-> this problem only occurs when using legacy file sources, other sources and 
the new source(FLIP-27) will not have two operators.

yes,  this problem only occurs when using legacy file sources

 

We can solve this problem in this way: before create 
*_{{ContinuousFileMonitoringFunction}}_ ,* we get the value of 
*_default.parallelism_* , if the value is -1 and AdaptiveBatchScheduler is 
enabled, we pass the value of 
*_jobmanager.adaptive-batch-scheduler.default-source-parallelism_* {_}to 
*{{ContinuousFileMonitoringFunction.}}*{_}{{{}Then we need to set the 
parallelism of real source reader to  
*_jobmanager.adaptive-batch-scheduler.default-source-parallelism._*{}}}

> the source parallelism is not set correctly with AdaptiveBatchScheduler
> ---
>
> Key: FLINK-26548
> URL: https://issues.apache.org/jira/browse/FLINK-26548
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.15.0
>Reporter: zl
>Priority: Blocker
> Fix For: 1.15.0
>
> Attachments: image-2022-03-09-19-00-18-396.png
>
>
> When running *_org.apache.flink.table.tpcds.TpcdsTestProgram_* with 
> {_}*AdaptiveBatchScheduler*{_}, I ran into a problem:the num of records sent 
> by the source operator is always 1, and the parallelism of source operator is 
> also 1 even I set 
> *_jobmanager.adaptive-batch-scheduler.default-source-parallelism_* to 8.
> !image-2022-03-09-19-00-18-396.png!
> After some research, I found that the operator A is not the actual file 
> reader, it just splits files and assigns splits to downstream tasks for 
> further processing, and the operator B is the actual file reader task. Here, 
> the parallelism of operator B is 64, and the records sent by operator A is 1, 
> this means, operator A assigned all splits to a task of operator B, {*}_the 
> other 63 tasks of operator B is idle_{*}, it is unreasonable.
> In this case,  the parallelism of operator B should be 
> *_jobmanager.adaptive-batch-scheduler.default-source-parallelism_*  and the 
> num of records sent by operator A also should be 
> {*}_jobmanager.adaptive-batch-scheduler.default-source-parallelism_{*}.
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26548) the source parallelism is not set correctly with AdaptiveBatchScheduler

2022-03-09 Thread zl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17503510#comment-17503510
 ] 

zl commented on FLINK-26548:


Hi [~wanglijie95] , can you confirm this problem? 

> the source parallelism is not set correctly with AdaptiveBatchScheduler
> ---
>
> Key: FLINK-26548
> URL: https://issues.apache.org/jira/browse/FLINK-26548
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.15.0
>Reporter: zl
>Priority: Major
> Attachments: image-2022-03-09-19-00-18-396.png
>
>
> When running *_org.apache.flink.table.tpcds.TpcdsTestProgram_* with 
> {_}*AdaptiveBatchScheduler*{_}, I ran into a problem:the num of records sent 
> by the source operator is always 1, and the parallelism of source operator is 
> also 1 even I set 
> *_jobmanager.adaptive-batch-scheduler.default-source-parallelism_* to 8.
> !image-2022-03-09-19-00-18-396.png!
> After some research, I found that the operator A is not the actual file 
> reader, it just splits files and assigns splits to downstream tasks for 
> further processing, and the operator B is the actual file reader task. Here, 
> the parallelism of operator B is 64, and the records sent by operator A is 1, 
> this means, operator A assigned all splits to a task of operator B, {*}_the 
> other 63 tasks of operator B is idle_{*}, it is unreasonable.
> In this case,  the parallelism of operator B should be 
> *_jobmanager.adaptive-batch-scheduler.default-source-parallelism_*  and the 
> num of records sent by operator A also should be 
> {*}_jobmanager.adaptive-batch-scheduler.default-source-parallelism_{*}.
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26548) the source parallelism is not set correctly with AdaptiveBatchScheduler

2022-03-09 Thread zl (Jira)
zl created FLINK-26548:
--

 Summary: the source parallelism is not set correctly with 
AdaptiveBatchScheduler
 Key: FLINK-26548
 URL: https://issues.apache.org/jira/browse/FLINK-26548
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Task
Affects Versions: 1.15.0
Reporter: zl
 Attachments: image-2022-03-09-19-00-18-396.png

When running *_org.apache.flink.table.tpcds.TpcdsTestProgram_* with 
{_}*AdaptiveBatchScheduler*{_}, I ran into a problem:the num of records sent by 
the source operator is always 1, and the parallelism of source operator is also 
1 even I set *_jobmanager.adaptive-batch-scheduler.default-source-parallelism_* 
to 8.

!image-2022-03-09-19-00-18-396.png!

After some research, I found that the operator A is not the actual file reader, 
it just splits files and assigns splits to downstream tasks for further 
processing, and the operator B is the actual file reader task. Here, the 
parallelism of operator B is 64, and the records sent by operator A is 1, this 
means, operator A assigned all splits to a task of operator B, {*}_the other 63 
tasks of operator B is idle_{*}, it is unreasonable.

In this case,  the parallelism of operator B should be 
*_jobmanager.adaptive-batch-scheduler.default-source-parallelism_*  and the num 
of records sent by operator A also should be 
{*}_jobmanager.adaptive-batch-scheduler.default-source-parallelism_{*}.

 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (FLINK-24070) Support different over window aggregates in streaming queries

2021-08-31 Thread zl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17407286#comment-17407286
 ] 

zl edited comment on FLINK-24070 at 8/31/21, 12:13 PM:
---

Hi [~lzljs3620320],f2 is a time field. and the code above is just to show two 
different over windows in a query statement.  the difference between the two 
over window is that the first over window uses _*2 PRECEDING*_ rows and the 
second over window uses *_5 PRECEDING_* rows. Flink does not support this kind 
of statement.


was (Author: leo zhou):
Hi [~lzljs3620320],f2 is a time field. and the code above is just to show two 
different over windows in a query statement.  the difference between the two 
over window is that the first over window uses _*2 PRECEDING*_ rows and the 
second over window uses *_5 PRECEDING_* rows.

> Support different over window aggregates in streaming queries
> -
>
> Key: FLINK-24070
> URL: https://issues.apache.org/jira/browse/FLINK-24070
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: zl
>Priority: Major
>
> Currently, Flink does not support using different over window aggregates in a 
> query statement, such as:
>  
> {code:java}
> // code placeholder
> select 
> f1, f2, f3, 
> sum(f3) over (PARTITION BY f1 ORDER BY f2 ROWS BETWEEN 2 PRECEDING AND 
> CURRENT ROW) as sum_1,
> sum(f3) over (PARTITION BY f1 ORDER BY f2 ROWS BETWEEN 5 PRECEDING AND 
> CURRENT ROW) as sum_2
> from table_a;{code}
>  
> However, this feature is commonly used in feature engineering for generating 
> some new features, do we have plan to support this?
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-24070) Support different over window aggregates in streaming queries

2021-08-31 Thread zl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17407286#comment-17407286
 ] 

zl commented on FLINK-24070:


Hi [~lzljs3620320],f2 is a time field. and the code above is just to show two 
different over windows in a query statement.  the difference between the two 
over window is that the first over window uses _*2 PRECEDING*_ rows and the 
second over window uses *_5 PRECEDING_* rows.

> Support different over window aggregates in streaming queries
> -
>
> Key: FLINK-24070
> URL: https://issues.apache.org/jira/browse/FLINK-24070
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: zl
>Priority: Major
>
> Currently, Flink does not support using different over window aggregates in a 
> query statement, such as:
>  
> {code:java}
> // code placeholder
> select 
> f1, f2, f3, 
> sum(f3) over (PARTITION BY f1 ORDER BY f2 ROWS BETWEEN 2 PRECEDING AND 
> CURRENT ROW) as sum_1,
> sum(f3) over (PARTITION BY f1 ORDER BY f2 ROWS BETWEEN 5 PRECEDING AND 
> CURRENT ROW) as sum_2
> from table_a;{code}
>  
> However, this feature is commonly used in feature engineering for generating 
> some new features, do we have plan to support this?
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24070) Support different over window aggregates in streaming queries

2021-08-31 Thread zl (Jira)
zl created FLINK-24070:
--

 Summary: Support different over window aggregates in streaming 
queries
 Key: FLINK-24070
 URL: https://issues.apache.org/jira/browse/FLINK-24070
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Reporter: zl


Currently, Flink does not support using different over window aggregates in a 
query statement, such as:

 
{code:java}
// code placeholder
select 
f1, f2, f3, 
sum(f3) over (PARTITION BY f1 ORDER BY f2 ROWS BETWEEN 2 PRECEDING AND 
CURRENT ROW) as sum_1,
sum(f3) over (PARTITION BY f1 ORDER BY f2 ROWS BETWEEN 5 PRECEDING AND 
CURRENT ROW) as sum_2
from table_a;{code}
 

However, this feature is commonly used in feature engineering for generating 
some new features, do we have plan to support this?

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21657) flink doc fails to display a picture

2021-03-07 Thread zl (Jira)
zl created FLINK-21657:
--

 Summary: flink doc fails to display a picture 
 Key: FLINK-21657
 URL: https://issues.apache.org/jira/browse/FLINK-21657
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Reporter: zl
 Attachments: image-2021-03-08-15-07-01-698.png

[Once More, With 
Streaming!]([https://ci.apache.org/projects/flink/flink-docs-master/docs/try-flink/table_api/#once-more-with-streaming])
 fails to display a picture.

 

!image-2021-03-08-15-07-01-698.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-21501) Sync Chinese documentation with FLINK-21343

2021-02-28 Thread zl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17292574#comment-17292574
 ] 

zl commented on FLINK-21501:


Hi [~dwysakowicz], I can do this work, can assign this issue to me ?

> Sync Chinese documentation with FLINK-21343
> ---
>
> Key: FLINK-21501
> URL: https://issues.apache.org/jira/browse/FLINK-21501
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.13.0
>Reporter: Dawid Wysakowicz
>Priority: Major
>
> We should update the Chinese documentation with changes introduced in 
> FLINK-21343



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18640) Fix PostgresDialect doesn't quote the identifiers

2020-10-29 Thread zl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18640?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17222725#comment-17222725
 ] 

zl commented on FLINK-18640:


Maybe we can add a new property named `schema` for jdbc connector to 
distinguish schema name and table name, then table name contains `.` can also 
be supported.  btw, are you still working on this issue [~良] ?  if no one is 
working on this, can assign this ticket to me ? [~jark]

> Fix PostgresDialect doesn't quote the identifiers
> -
>
> Key: FLINK-18640
> URL: https://issues.apache.org/jira/browse/FLINK-18640
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.9.1, 1.10.1
>Reporter: 毛宗良
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0, 1.10.3, 1.11.3
>
>
> Flink jdbc throw exceptions when read a postgresql table with scheam, like 
> "ods.t_test". BY debugging the source code, I found a bug about dealing the 
> table name.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19829) should enclose column name in double quotes for PostgreSQL

2020-10-28 Thread zl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17222101#comment-17222101
 ] 

zl commented on FLINK-19829:


Hi [~jark], can you take a look at this issue ?

> should enclose column name in double quotes for PostgreSQL
> --
>
> Key: FLINK-19829
> URL: https://issues.apache.org/jira/browse/FLINK-19829
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Reporter: zl
>Priority: Major
>
> when I run the sql in flink:
> {code:sql}
> create table pg_sink ( 
> Name VARCHAR, 
> address VARCHAR, 
> work VARCHAR) 
> with (   
> 'connector' = 'jdbc',   
> 'url' = 'jdbc:postgresql://***:***/***',    
> 'table-name' = 'pg_sink', 
> ...
> ) 
> create table kafka_source(    
> Name VARCHAR,    
> address VARCHAR,    
> work VARCHAR
> ) with (    
> 'connector.type' = 'kafka',    
> 'format.type' = 'json',
> ...
> )
> insert into pg_sink select * from kafka_source{code}
> the following exception happens:
> {code:java}
> Caused by: org.postgresql.util.PSQLException: ERROR: column "Name" of 
> relation "pg_sink" does not exist
> ...{code}
> we can solve this problem by remove method *_quoteIdentifier_* in 
> *_PostgresDialect.java_*, then the method *_quoteIdentifier_*  in 
> _*JdbcDialect.java*_ will be used to enclose the column name in double quotes 
> for PostgreSQL. 
> could assign this issue to me ?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19829) should enclose column name in double quotes for PostgreSQL

2020-10-27 Thread zl (Jira)
zl created FLINK-19829:
--

 Summary: should enclose column name in double quotes for PostgreSQL
 Key: FLINK-19829
 URL: https://issues.apache.org/jira/browse/FLINK-19829
 Project: Flink
  Issue Type: Bug
  Components: Connectors / JDBC
Reporter: zl


when I run the sql in flink:
{code:sql}
create table pg_sink ( 
Name VARCHAR, 
address VARCHAR, 
work VARCHAR) 
with (   
'connector' = 'jdbc',   
'url' = 'jdbc:postgresql://***:***/***',    
'table-name' = 'pg_sink', 
...
) 

create table kafka_source(    
Name VARCHAR,    
address VARCHAR,    
work VARCHAR
) with (    
'connector.type' = 'kafka',    
'format.type' = 'json',
...
)

insert into pg_sink select * from kafka_source{code}
the following exception happens:
{code:java}
Caused by: org.postgresql.util.PSQLException: ERROR: column "Name" of relation 
"pg_sink" does not exist
...{code}
we can solve this problem by remove method *_quoteIdentifier_* in 
*_PostgresDialect.java_*, then the method *_quoteIdentifier_*  in 
_*JdbcDialect.java*_ will be used to enclose the column name in double quotes 
for PostgreSQL. 

could assign this issue to me ?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16699) Support accessing secured services via K8s secrets

2020-07-20 Thread zl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17160928#comment-17160928
 ] 

zl commented on FLINK-16699:


thanks [~fly_in_gis] , it will be very helpful.

> Support accessing secured services via K8s secrets
> --
>
> Key: FLINK-16699
> URL: https://issues.apache.org/jira/browse/FLINK-16699
> Project: Flink
>  Issue Type: Sub-task
>  Components: Deployment / Kubernetes
>Reporter: Canbin Zheng
>Assignee: zl
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0
>
>
> Kubernetes 
> [Secrets|https://kubernetes.io/docs/concepts/configuration/secret/] can be 
> used to provide credentials for a Flink application to access secured 
> services.  This ticket proposes to 
>  # Support to mount user-specified K8s Secrets into the 
> JobManager/TaskManager Container
>  # Support to use a user-specified K8s Secret through an environment variable.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16699) Support accessing secured services via K8s secrets

2020-07-14 Thread zl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17157347#comment-17157347
 ] 

zl commented on FLINK-16699:


thanks, it will be helpful [~felixzheng]

> Support accessing secured services via K8s secrets
> --
>
> Key: FLINK-16699
> URL: https://issues.apache.org/jira/browse/FLINK-16699
> Project: Flink
>  Issue Type: Sub-task
>  Components: Deployment / Kubernetes
>Reporter: Canbin Zheng
>Assignee: zl
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0
>
>
> Kubernetes 
> [Secrets|https://kubernetes.io/docs/concepts/configuration/secret/] can be 
> used to provide credentials for a Flink application to access secured 
> services.  This ticket proposes to 
>  # Support to mount user-specified K8s Secrets into the 
> JobManager/TaskManager Container
>  # Support to use a user-specified K8s Secret through an environment variable.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-16699) Support accessing secured services via K8s secrets

2020-07-14 Thread zl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17155815#comment-17155815
 ] 

zl edited comment on FLINK-16699 at 7/14/20, 8:42 AM:
--

Hi,[~tison] ,I have some experience in adding kubernetes feature to FlinkPod in 
my work,could you assign this ticket to me ? 


was (Author: leo zhou):
Hi,[~felixzheng]  [~fly_in_gis] ,I have some experience in adding kubernetes 
feature to FlinkPod in my work,could you assign this ticket to me ? 

> Support accessing secured services via K8s secrets
> --
>
> Key: FLINK-16699
> URL: https://issues.apache.org/jira/browse/FLINK-16699
> Project: Flink
>  Issue Type: Sub-task
>  Components: Deployment / Kubernetes
>Reporter: Canbin Zheng
>Priority: Major
> Fix For: 1.12.0
>
>
> Kubernetes 
> [Secrets|https://kubernetes.io/docs/concepts/configuration/secret/] can be 
> used to provide credentials for a Flink application to access secured 
> services.  This ticket proposes to 
>  # Support to mount user-specified K8s Secrets into the 
> JobManager/TaskManager Container
>  # Support to use a user-specified K8s Secret through an environment variable.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16699) Support accessing secured services via K8s secrets

2020-07-12 Thread zl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17156442#comment-17156442
 ] 

zl commented on FLINK-16699:


Hi, [~felixzheng] [~fly_in_gis] , I have alreadly finished this feature, do you 
mind if I submit a PR under this issue ?

> Support accessing secured services via K8s secrets
> --
>
> Key: FLINK-16699
> URL: https://issues.apache.org/jira/browse/FLINK-16699
> Project: Flink
>  Issue Type: Sub-task
>  Components: Deployment / Kubernetes
>Reporter: Canbin Zheng
>Priority: Major
> Fix For: 1.12.0
>
>
> Kubernetes 
> [Secrets|https://kubernetes.io/docs/concepts/configuration/secret/] can be 
> used to provide credentials for a Flink application to access secured 
> services.  This ticket proposes to 
>  # Support to mount user-specified K8s Secrets into the 
> JobManager/TaskManager Container
>  # Support to use a user-specified K8s Secret through an environment variable.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-16699) Support accessing secured services via K8s secrets

2020-07-10 Thread zl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17155815#comment-17155815
 ] 

zl edited comment on FLINK-16699 at 7/11/20, 5:43 AM:
--

Hi,[~felixzheng]  [~fly_in_gis] ,I have some experience in adding kubernetes 
feature to FlinkPod in my work,could you assign this ticket to me ? 


was (Author: leo zhou):
Hi,[~felixzheng] ,I have some experience in adding kubernetes feature to 
FlinkPod in my work,could you assign this ticket to me ?

> Support accessing secured services via K8s secrets
> --
>
> Key: FLINK-16699
> URL: https://issues.apache.org/jira/browse/FLINK-16699
> Project: Flink
>  Issue Type: Sub-task
>  Components: Deployment / Kubernetes
>Reporter: Canbin Zheng
>Priority: Major
> Fix For: 1.12.0
>
>
> Kubernetes 
> [Secrets|https://kubernetes.io/docs/concepts/configuration/secret/] can be 
> used to provide credentials for a Flink application to access secured 
> services.  This ticket proposes to 
>  # Support to mount user-specified K8s Secrets into the 
> JobManager/TaskManager Container
>  # Support to use a user-specified K8s Secret through an environment variable.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16699) Support accessing secured services via K8s secrets

2020-07-10 Thread zl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17155815#comment-17155815
 ] 

zl commented on FLINK-16699:


Hi,[~felixzheng] ,I have some experience in adding kubernetes feature to 
FlinkPod in my work,could you assign this ticket to me ?

> Support accessing secured services via K8s secrets
> --
>
> Key: FLINK-16699
> URL: https://issues.apache.org/jira/browse/FLINK-16699
> Project: Flink
>  Issue Type: Sub-task
>  Components: Deployment / Kubernetes
>Reporter: Canbin Zheng
>Priority: Major
> Fix For: 1.12.0
>
>
> Kubernetes 
> [Secrets|https://kubernetes.io/docs/concepts/configuration/secret/] can be 
> used to provide credentials for a Flink application to access secured 
> services.  This ticket proposes to 
>  # Support to mount user-specified K8s Secrets into the 
> JobManager/TaskManager Container
>  # Support to use a user-specified K8s Secret through an environment variable.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17368) exception message in PrometheusPushGatewayReporter

2020-04-24 Thread zl (Jira)
zl created FLINK-17368:
--

 Summary: exception message in PrometheusPushGatewayReporter 
 Key: FLINK-17368
 URL: https://issues.apache.org/jira/browse/FLINK-17368
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Metrics
Affects Versions: 1.10.0
Reporter: zl


when sending flink metrics to prometheus pushgateway by using 
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter, there are a 
lof of exception message in taskmanager log. Here is the exception stack:
{code:java}
2020-04-23 18:16:44,927 WARN  
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter  - Failed to 
push metrics to PushGateway with jobName a517f2f8bb79b59abb5e596f34adca27, 
groupingKey {}.
java.io.IOException: Response code from 
http://10.3.71.136:9091/metrics/job/a517f2f8bb79b59abb5e596f34adca27 was 200
at 
org.apache.flink.shaded.io.prometheus.client.exporter.PushGateway.doRequest(PushGateway.java:297)
at 
org.apache.flink.shaded.io.prometheus.client.exporter.PushGateway.push(PushGateway.java:127)
at 
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter.report(PrometheusPushGatewayReporter.java:109)
at 
org.apache.flink.runtime.metrics.MetricRegistryImpl$ReporterTask.run(MetricRegistryImpl.java:441)
at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at 
java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
at 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
{code}
 

After investigation, I found that it's 
io.prometheus:simpleclient_pushgateway:0.3.0 which casues the exception. before 
io.prometheus:simpleclient_pushgateway:0.8.0, 
io.prometheus.client.exporter.PushGateway#doRequest use response code 202 to 
decide whether a metric is successfully sended or not, so response code 200 
indicates a failed transmission. In 
io.prometheus:simpleclient_pushgateway:0.8.0, response code 2xx is used to 
indicates a successful transmission.

 

After we change the version of io.prometheus:simpleclient_pushgateway to 0.8.0 
in flink-metrics-prometheus module, there have been no more such exception.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)