[GitHub] [flink] Aitozi commented on pull request #22369: [FLINK-31755][planner] Make SqlRowOperator to create row with PEEK_FIELDS_NO_EXPAND struct type

2023-04-07 Thread via GitHub


Aitozi commented on PR #22369:
URL: https://github.com/apache/flink/pull/22369#issuecomment-1500774206

   CC @lincoln-lil Can you please also help review this PR, thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31588) The unaligned checkpoint type is wrong at subtask level

2023-04-07 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17709823#comment-17709823
 ] 

Rui Fan commented on FLINK-31588:
-

Thanks for your feedback.
{quote} If checkpoint was unaligned, as it arrived unaligned, it should be 
reported as such, even if that particular subtask didn't persist any data.
{quote}
Sounds make sense. I will prepare this PR next week.

> The unaligned checkpoint type is wrong at subtask level
> ---
>
> Key: FLINK-31588
> URL: https://issues.apache.org/jira/browse/FLINK-31588
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
> Attachments: image-2023-03-23-18-45-01-535.png
>
>
> FLINK-20488 supported show checkpoint type for each subtask, and it based on 
> received `CheckpointOptions` and it's right.
> However, FLINK-27251 supported timeout aligned to unaligned checkpoint 
> barrier in the output buffers. It means the received `CheckpointOptions` can 
> be converted from aligned checkpoint to unaligned checkpoint.
> So, the unaligned checkpoint type may be wrong at subtask level. For example, 
> as shown in the figure below, Unaligned checkpoint type is false, but it is 
> actually Unaligned checkpoint (persisted data > 0).
>  
> !image-2023-03-23-18-45-01-535.png|width=1879,height=797!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] slfan1989 commented on a diff in pull request #22207: [FLINK-31510][yarn] Use getMemorySize instead of getMemory.

2023-04-07 Thread via GitHub


slfan1989 commented on code in PR #22207:
URL: https://github.com/apache/flink/pull/22207#discussion_r1160998952


##
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java:
##
@@ -1945,8 +1938,4 @@ Map generateApplicationMasterEnv(
 Utils.setupYarnClassPath(this.yarnConfiguration, env);
 return env;
 }
-
-private String getDisplayMemory(long memoryMB) {

Review Comment:
   Sorry, it was my mistake, I lost part of the code when I rebase, I 
re-updated the code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31588) The unaligned checkpoint type is wrong at subtask level

2023-04-07 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17709783#comment-17709783
 ] 

Piotr Nowojski commented on FLINK-31588:


Sorry for late response, I've just found an old tab with WIP comment that I 
wanted to write, but somehow didn't send as something must have interrupted me 
:(

Thanks for reporting the issue. I see the problem. I think ideally we should 
try to keep the semantic of that flag in sync with what {{StreamTask}} was 
actually doing. If checkpoint was unaligned, as it arrived unaligned, it should 
be reported as such, even if that particular subtask didn't persist any data. 
Can we still achieve that? 

> The unaligned checkpoint type is wrong at subtask level
> ---
>
> Key: FLINK-31588
> URL: https://issues.apache.org/jira/browse/FLINK-31588
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.16.0, 1.17.0
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
> Attachments: image-2023-03-23-18-45-01-535.png
>
>
> FLINK-20488 supported show checkpoint type for each subtask, and it based on 
> received `CheckpointOptions` and it's right.
> However, FLINK-27251 supported timeout aligned to unaligned checkpoint 
> barrier in the output buffers. It means the received `CheckpointOptions` can 
> be converted from aligned checkpoint to unaligned checkpoint.
> So, the unaligned checkpoint type may be wrong at subtask level. For example, 
> as shown in the figure below, Unaligned checkpoint type is false, but it is 
> actually Unaligned checkpoint (persisted data > 0).
>  
> !image-2023-03-23-18-45-01-535.png|width=1879,height=797!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-21949) Support collect to array aggregate function

2023-04-07 Thread Roland Johann (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17709755#comment-17709755
 ] 

Roland Johann edited comment on FLINK-21949 at 4/7/23 6:50 PM:
---

Is someone working on this one? If someone can provide moderate guidance I can 
support to implement it.

Especially how to implement aggregate functions that accept table aliases as 
argument is something I din't figure out yet.


was (Author: rolandjohann):
Is someone working on this one? If someone can provide moderate guidance I can 
support to implement it.

> Support collect to array aggregate function
> ---
>
> Key: FLINK-21949
> URL: https://issues.apache.org/jira/browse/FLINK-21949
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.12.0
>Reporter: Jiabao Sun
>Priority: Minor
>
> Some nosql databases like mongodb and elasticsearch support nested data types.
> Aggregating multiple rows into ARRAY is a common requirement.
> The CollectToArray function is similar to Collect, except that it returns 
> ARRAY instead of MULTISET.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-21949) Support collect to array aggregate function

2023-04-07 Thread Roland Johann (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17709755#comment-17709755
 ] 

Roland Johann commented on FLINK-21949:
---

Is someone working on this one? If someone can provide moderate guidance I can 
support to implement it.

> Support collect to array aggregate function
> ---
>
> Key: FLINK-21949
> URL: https://issues.apache.org/jira/browse/FLINK-21949
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.12.0
>Reporter: Jiabao Sun
>Priority: Minor
>
> Some nosql databases like mongodb and elasticsearch support nested data types.
> Aggregating multiple rows into ARRAY is a common requirement.
> The CollectToArray function is similar to Collect, except that it returns 
> ARRAY instead of MULTISET.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] flinkbot commented on pull request #22370: [hotfix] Fix typos of comment

2023-04-07 Thread via GitHub


flinkbot commented on PR #22370:
URL: https://github.com/apache/flink/pull/22370#issuecomment-1500531387

   
   ## CI report:
   
   * 2d915808770bc98569b6c084f48bbbcddc74e88f UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] hustonic opened a new pull request, #22370: [hotfix] Fix typos of comment

2023-04-07 Thread via GitHub


hustonic opened a new pull request, #22370:
URL: https://github.com/apache/flink/pull/22370

   
   
   ## What is the purpose of the change
   
   Fix typos of comment


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] hustonic closed pull request #22368: [hotfix] Fix incorrect comment.

2023-04-07 Thread via GitHub


hustonic closed pull request #22368: [hotfix] Fix incorrect comment.
URL: https://github.com/apache/flink/pull/22368


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-kafka] jeremy-degroot commented on a diff in pull request #20: [FLINK-29398][connector/kafka] Provide rack ID to Kafka Source to take advantage of Rack Awareness

2023-04-07 Thread via GitHub


jeremy-degroot commented on code in PR #20:
URL: 
https://github.com/apache/flink-connector-kafka/pull/20#discussion_r1160854136


##
flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java:
##
@@ -319,6 +321,51 @@ public void 
testUsingCommittedOffsetsWithEarliestOrLatestOffsetResetStrategy(
 
assertThat(reader.consumer().position(partition)).isEqualTo(expectedOffset);
 }
 
+@Test
+public void testConsumerClientRackSupplier() {
+AtomicReference supplierCalled = new AtomicReference<>(false);
+String rackId = "use1-az1";
+Supplier rackIdSupplier =
+() -> {
+supplierCalled.set(true);
+return rackId;
+};
+Properties properties = new Properties();
+createReader(
+properties,
+UnregisteredMetricsGroup.createSourceReaderMetricGroup(),
+rackIdSupplier);
+assertThat(supplierCalled.get()).isEqualTo(true);
+}
+
+@ParameterizedTest
+@NullAndEmptySource
+public void testSetConsumerClientRackIgnoresNullAndEmpty(String rackId) {
+Properties properties = new Properties();
+Supplier rackIdSupplier = () -> rackId;
+KafkaPartitionSplitReader reader =
+createReader(
+properties,
+
UnregisteredMetricsGroup.createSourceReaderMetricGroup(),
+rackIdSupplier);
+reader.setConsumerClientRack(properties, rackIdSupplier);
+
assertThat(properties.containsKey(ConsumerConfig.CLIENT_RACK_CONFIG)).isFalse();
+}
+
+@Test
+public void testSetConsumerClientRackUsesCorrectParameter() {

Review Comment:
   You're right, I'll be pushing an update to the branch to do that shortly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31755) ROW function can not work with RewriteIntersectAllRule

2023-04-07 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31755?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-31755:
---
Labels: pull-request-available  (was: )

> ROW function can not work with RewriteIntersectAllRule
> --
>
> Key: FLINK-31755
> URL: https://issues.apache.org/jira/browse/FLINK-31755
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Aitozi
>Priority: Major
>  Labels: pull-request-available
>
> Reproduce case:
> {code:java}
> create table row_sink (
>   `b` ROW
> ) with (
>   'connector' = 'values'
> )
> util.verifyRelPlanInsert(
> "INSERT INTO row_sink " +
>   "SELECT ROW(a, b) FROM complex_type_src intersect all " +
>   "SELECT ROW(c, d) FROM complex_type_src ")
> {code}
> It will fails with 
> {code:java}
> Caused by: java.lang.IllegalArgumentException: Type mismatch:
> rel rowtype: RecordType(RecordType:peek_no_expand(VARCHAR(2147483647) 
> CHARACTER SET "UTF-16LE" EXPR$0, INTEGER EXPR$1) NOT NULL EXPR$0) NOT NULL
> equiv rowtype: RecordType(RecordType(VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE" EXPR$0, INTEGER EXPR$1) NOT NULL EXPR$0) NOT NULL
> Difference:
> EXPR$0: RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE" EXPR$0, INTEGER EXPR$1) NOT NULL -> RecordType(VARCHAR(2147483647) 
> CHARACTER SET "UTF-16LE" EXPR$0, INTEGER EXPR$1) NOT NULL
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:592)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:613)
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:144)
>   ... 68 more
> {code}
> The reason is:
> ROW function will generates the {{FULLY_QUALIFIED}} type. But after the 
> {{RewriteIntersectAllRule}} optimization, it will produce the 
> {{PEEK_FIELDS_NO_EXPAND}}. So the volcano planner complains with type 
> mismatch.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] flinkbot commented on pull request #22369: [FLINK-31755][planner] Make SqlRowOperator to create row with PEEK_FIELDS_NO_EXPAND struct type

2023-04-07 Thread via GitHub


flinkbot commented on PR #22369:
URL: https://github.com/apache/flink/pull/22369#issuecomment-1500478242

   
   ## CI report:
   
   * b2d7c2f3c959d62f48cc64f4b3c898ebfbacbf15 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] Aitozi opened a new pull request, #22369: aupdate

2023-04-07 Thread via GitHub


Aitozi opened a new pull request, #22369:
URL: https://github.com/apache/flink/pull/22369

   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #22368: [hotfix] Fix incorrect comment.

2023-04-07 Thread via GitHub


flinkbot commented on PR #22368:
URL: https://github.com/apache/flink/pull/22368#issuecomment-1500461161

   
   ## CI report:
   
   * 79facde5ed44dc1ffa2277ef53cb736602e20bd1 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] hustonic opened a new pull request, #22368: [hotfix] Fix incorrect comment.

2023-04-07 Thread via GitHub


hustonic opened a new pull request, #22368:
URL: https://github.com/apache/flink/pull/22368

   
   
   ## What is the purpose of the change
   
   Fix typos of comment.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] Aitozi commented on pull request #22363: [FLINK-31301][planner] Add support of nested columns in column list o…

2023-04-07 Thread via GitHub


Aitozi commented on PR #22363:
URL: https://github.com/apache/flink/pull/22363#issuecomment-1500451435

   @lincoln-lil please take a look when you are free, thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #21527: [FLINK-27925] [kubernetes]Performance optimization when watch tm pod and list pod.

2023-04-07 Thread via GitHub


reswqa commented on code in PR #21527:
URL: https://github.com/apache/flink/pull/21527#discussion_r1160799424


##
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java:
##
@@ -191,7 +192,12 @@ public Optional getRestEndpoint(String 
clusterId) {
 
 @Override
 public List getPodsWithLabels(Map labels) {
-final List podList = 
this.internalClient.pods().withLabels(labels).list().getItems();
+final List podList =
+this.internalClient
+.pods()
+.withLabels(labels)
+.list(new 
ListOptionsBuilder().withResourceVersion("0").build())

Review Comment:
   ```suggestion
   .list(new 
ListOptionsBuilder().withResourceVersion("0").build())
   ```
   We'd better add some comments for this magic number.



##
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java:
##
@@ -233,6 +239,7 @@ public KubernetesWatch watchPodsAndDoCallback(
 this.internalClient
 .pods()
 
.withLabels(labels)
+
.withResourceVersion("0")

Review Comment:
   Refer to previous comments.



##
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java:
##
@@ -143,6 +147,34 @@ protected void mockGetDeploymentWithError() {
 server.expect().get().withPath(path).andReturn(500, "Expected 
error").always();
 }
 
+protected void mockPodEventWithLabels(Map labels) {
+final Pod pod1 =
+new PodBuilder()
+.withNewMetadata()
+.withNamespace("test")
+.withName("tm_pod1")
+.withLabels(labels)
+.withResourceVersion("5668")
+.endMetadata()
+.build();
+// mock four kinds of events.
+server.expect()
+.withPath(
+
"/api/v1/namespaces/test/pods?labelSelector=label1%3Dvalue1%2Clabel2%3Dvalue2=0=true=true")
+.andUpgradeToWebSocket()
+.open()
+.waitFor(1000)

Review Comment:
   I'm not very familiar with this api. Can anyone tell me if this means we 
will definitely wait for `1` second or at most `1` second. If it is the former, 
it should definitely be prohibited because it will seriously slow down the 
execution time of `AZP`.



##
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java:
##
@@ -411,6 +432,16 @@ void testStopAndCleanupCluster() throws Exception {
 .isEmpty();
 }
 
+@Test
+void testWatchPodsAndDoCallback() throws Exception {
+mockPodEventWithLabels(TESTING_LABELS);
+// the count latch for events.
+final CountDownLatch eventLatch = new CountDownLatch(4);
+this.flinkKubeClient.watchPodsAndDoCallback(

Review Comment:
   It seems that we have too many class implements 
`FlinkKubeClient.WatchCallbackHandler` only for testing purpose. We'd better 
introduce a more general and reusable `TestingKubernetesPodCallbackHandler` as 
the first commit and rewrite all other impls like `NoOpWatchCallbackHandler` & 
`TestingCallbackHandler`. 
   
   Don't worry too much, if you don't know how to do this refactor, I can push 
this commit as example.



##
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java:
##
@@ -191,7 +192,12 @@ public Optional getRestEndpoint(String 
clusterId) {
 
 @Override
 public List getPodsWithLabels(Map labels) {
-final List podList = 
this.internalClient.pods().withLabels(labels).list().getItems();
+final List podList =
+this.internalClient
+.pods()
+.withLabels(labels)
+.list(new 
ListOptionsBuilder().withResourceVersion("0").build())

Review Comment:
   +1 for this.



##
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java:
##
@@ -143,6 +147,34 @@ protected void mockGetDeploymentWithError() {
 server.expect().get().withPath(path).andReturn(500, "Expected 
error").always();
 }
 
+protected void mockPodEventWithLabels(Map labels) {
+final Pod pod1 =
+new PodBuilder()
+.withNewMetadata()
+.withNamespace("test")
+.withName("tm_pod1")
+.withLabels(labels)
+.withResourceVersion("5668")
+

[GitHub] [flink-connector-kafka] jeremy-degroot commented on a diff in pull request #20: [FLINK-29398][connector/kafka] Provide rack ID to Kafka Source to take advantage of Rack Awareness

2023-04-07 Thread via GitHub


jeremy-degroot commented on code in PR #20:
URL: 
https://github.com/apache/flink-connector-kafka/pull/20#discussion_r1160774759


##
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java:
##
@@ -80,11 +80,20 @@ public KafkaPartitionSplitReader(
 Properties props,
 SourceReaderContext context,
 KafkaSourceReaderMetrics kafkaSourceReaderMetrics) {
+this(props, context, kafkaSourceReaderMetrics, () -> null);
+}
+
+public KafkaPartitionSplitReader(
+Properties props,
+SourceReaderContext context,
+KafkaSourceReaderMetrics kafkaSourceReaderMetrics,
+Supplier rackIdSupplier) {
 this.subtaskId = context.getIndexOfSubtask();
 this.kafkaSourceReaderMetrics = kafkaSourceReaderMetrics;
 Properties consumerProps = new Properties();
 consumerProps.putAll(props);
 consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, 
createConsumerClientId(props));
+setConsumerClientRack(consumerProps, rackIdSupplier);

Review Comment:
   I looked into this, and while it's certainly possible to do it that way the 
testing path is much more complex. Since the Supplier would have to be resolved 
in another Supplier that's passed to the Reader, 
testing that the behavior is as expected in the actual execution path is 
difficult. In KafkaPartitionSplitReader, we can call the constructor directly 
and verify that the rackIdSupplier is called, and then also verify it does what 
we need it to by verifying the behavior of the helper method you noted further 
down. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #22207: [FLINK-31510][yarn] Use getMemorySize instead of getMemory.

2023-04-07 Thread via GitHub


reswqa commented on code in PR #22207:
URL: https://github.com/apache/flink/pull/22207#discussion_r1160752449


##
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java:
##
@@ -1945,8 +1938,4 @@ Map generateApplicationMasterEnv(
 Utils.setupYarnClassPath(this.yarnConfiguration, env);
 return env;
 }
-
-private String getDisplayMemory(long memoryMB) {

Review Comment:
   It seems that some test case related to this is failed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #22207: [FLINK-31510][yarn] Use getMemorySize instead of getMemory.

2023-04-07 Thread via GitHub


reswqa commented on code in PR #22207:
URL: https://github.com/apache/flink/pull/22207#discussion_r1160743167


##
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java:
##
@@ -1945,8 +1938,4 @@ Map generateApplicationMasterEnv(
 Utils.setupYarnClassPath(this.yarnConfiguration, env);
 return env;
 }
-
-private String getDisplayMemory(long memoryMB) {

Review Comment:
   Why remove this method? Is it because of even if we use a long value, 
converting to byte here will still overflow? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #22207: [FLINK-31510][yarn] Use getMemorySize instead of getMemory.

2023-04-07 Thread via GitHub


reswqa commented on code in PR #22207:
URL: https://github.com/apache/flink/pull/22207#discussion_r1160743167


##
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java:
##
@@ -1945,8 +1938,4 @@ Map generateApplicationMasterEnv(
 Utils.setupYarnClassPath(this.yarnConfiguration, env);
 return env;
 }
-
-private String getDisplayMemory(long memoryMB) {

Review Comment:
   Even if we use a long value, converting to byte here will still overflow, 
right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-kafka] jeremy-degroot commented on a diff in pull request #20: [FLINK-29398][connector/kafka] Provide rack ID to Kafka Source to take advantage of Rack Awareness

2023-04-07 Thread via GitHub


jeremy-degroot commented on code in PR #20:
URL: 
https://github.com/apache/flink-connector-kafka/pull/20#discussion_r1160718505


##
flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java:
##
@@ -319,6 +321,51 @@ public void 
testUsingCommittedOffsetsWithEarliestOrLatestOffsetResetStrategy(
 
assertThat(reader.consumer().position(partition)).isEqualTo(expectedOffset);
 }
 
+@Test
+public void testConsumerClientRackSupplier() {
+AtomicReference supplierCalled = new AtomicReference<>(false);
+String rackId = "use1-az1";
+Supplier rackIdSupplier =
+() -> {
+supplierCalled.set(true);
+return rackId;
+};
+Properties properties = new Properties();
+createReader(
+properties,
+UnregisteredMetricsGroup.createSourceReaderMetricGroup(),
+rackIdSupplier);
+assertThat(supplierCalled.get()).isEqualTo(true);
+}
+
+@ParameterizedTest
+@NullAndEmptySource
+public void testSetConsumerClientRackIgnoresNullAndEmpty(String rackId) {
+Properties properties = new Properties();
+Supplier rackIdSupplier = () -> rackId;
+KafkaPartitionSplitReader reader =
+createReader(
+properties,
+
UnregisteredMetricsGroup.createSourceReaderMetricGroup(),
+rackIdSupplier);
+reader.setConsumerClientRack(properties, rackIdSupplier);

Review Comment:
   As far as I know, neither the KafkaConsumer nor the 
KafkaPartitionSplitReader offers any way to examine the final consumer 
Properties, so to test the behavior of setConsumerClientRack we have to test it 
directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] huwh commented on a diff in pull request #22028: [FLINK-31230] Improve YarnClusterDescriptor memory unit display.

2023-04-07 Thread via GitHub


huwh commented on code in PR #22028:
URL: https://github.com/apache/flink/pull/22028#discussion_r1160704473


##
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java:
##
@@ -1265,11 +1266,9 @@ private ApplicationReport startAppMaster(
 if (appState != lastAppState) {
 LOG.info("Deploying cluster, current state " + 
appState);
 }
-if (System.currentTimeMillis() - lastLogTime > 6) {
-lastLogTime = System.currentTimeMillis();
+if (System.currentTimeMillis() - startTime > 6) {
 LOG.info(
-"Deployment took more than {} seconds. Please 
check if the requested resources are available in the YARN cluster",
-(lastLogTime - startTime) / 1000);
+"Deployment took more than 60 seconds. Please 
check if the requested resources are available in the YARN cluster");

Review Comment:
   @slfan1989 Never mind.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31755) ROW function can not work with RewriteIntersectAllRule

2023-04-07 Thread Aitozi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31755?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aitozi updated FLINK-31755:
---
Description: 
Reproduce case:


{code:java}
create table row_sink (
  `b` ROW
) with (
  'connector' = 'values'
)

util.verifyRelPlanInsert(
"INSERT INTO row_sink " +
  "SELECT ROW(a, b) FROM complex_type_src intersect all " +
  "SELECT ROW(c, d) FROM complex_type_src ")

{code}

It will fails with 


{code:java}
Caused by: java.lang.IllegalArgumentException: Type mismatch:
rel rowtype: RecordType(RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER 
SET "UTF-16LE" EXPR$0, INTEGER EXPR$1) NOT NULL EXPR$0) NOT NULL
equiv rowtype: RecordType(RecordType(VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE" EXPR$0, INTEGER EXPR$1) NOT NULL EXPR$0) NOT NULL
Difference:
EXPR$0: RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
EXPR$0, INTEGER EXPR$1) NOT NULL -> RecordType(VARCHAR(2147483647) CHARACTER 
SET "UTF-16LE" EXPR$0, INTEGER EXPR$1) NOT NULL

at 
org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:592)
at 
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:613)
at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:144)
... 68 more
{code}


The reason is:

ROW function will generates the {{FULLY_QUALIFIED}} type. But after the 
{{RewriteIntersectAllRule}} optimization, it will produce the 
{{PEEK_FIELDS_NO_EXPAND}}. So the volcano planner complains with type mismatch.

  was:
Reproduce case:


{code:java}
create table row_sink (
  `b` ROW
) with (
  'connector' = 'values'
)

util.verifyRelPlanInsert(
"INSERT INTO row_sink " +
  "SELECT ROW(a, b) FROM complex_type_src intersect all " +
  "SELECT ROW(c, d) FROM complex_type_src ")

{code}

It will fails with 


{code:java}
Caused by: java.lang.IllegalArgumentException: Type mismatch:
rel rowtype: RecordType(RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER 
SET "UTF-16LE" EXPR$0, INTEGER EXPR$1) NOT NULL EXPR$0) NOT NULL
equiv rowtype: RecordType(RecordType(VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE" EXPR$0, INTEGER EXPR$1) NOT NULL EXPR$0) NOT NULL
Difference:
EXPR$0: RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
EXPR$0, INTEGER EXPR$1) NOT NULL -> RecordType(VARCHAR(2147483647) CHARACTER 
SET "UTF-16LE" EXPR$0, INTEGER EXPR$1) NOT NULL

at 
org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:592)
at 
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:613)
at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:144)
... 68 more
{code}


The reason is:

ROW function will generates the {{FULLY_QUALIFIED}} type. But after the 
{{RewriteIntersectAllRule}} optimization, it will produce the 
{{PEEK_FIELDS_NO_EXPAND}}. So the volcan planner complains with type mismatch.


> ROW function can not work with RewriteIntersectAllRule
> --
>
> Key: FLINK-31755
> URL: https://issues.apache.org/jira/browse/FLINK-31755
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Aitozi
>Priority: Major
>
> Reproduce case:
> {code:java}
> create table row_sink (
>   `b` ROW
> ) with (
>   'connector' = 'values'
> )
> util.verifyRelPlanInsert(
> "INSERT INTO row_sink " +
>   "SELECT ROW(a, b) FROM complex_type_src intersect all " +
>   "SELECT ROW(c, d) FROM complex_type_src ")
> {code}
> It will fails with 
> {code:java}
> Caused by: java.lang.IllegalArgumentException: Type mismatch:
> rel rowtype: RecordType(RecordType:peek_no_expand(VARCHAR(2147483647) 
> CHARACTER SET "UTF-16LE" EXPR$0, INTEGER EXPR$1) NOT NULL EXPR$0) NOT NULL
> equiv rowtype: RecordType(RecordType(VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE" EXPR$0, INTEGER EXPR$1) NOT NULL EXPR$0) NOT NULL
> Difference:
> EXPR$0: RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE" EXPR$0, INTEGER EXPR$1) NOT NULL -> RecordType(VARCHAR(2147483647) 
> CHARACTER SET "UTF-16LE" EXPR$0, INTEGER EXPR$1) NOT NULL
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:592)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:613)
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:144)
>   ... 68 more
> {code}
> The reason is:
> ROW function will generates the {{FULLY_QUALIFIED}} type. But after the 
> {{RewriteIntersectAllRule}} optimization, it will produce the 
> {{PEEK_FIELDS_NO_EXPAND}}. So the volcano planner complains with type 
> mismatch.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31755) ROW function can not work with RewriteIntersectAllRule

2023-04-07 Thread Aitozi (Jira)
Aitozi created FLINK-31755:
--

 Summary: ROW function can not work with RewriteIntersectAllRule
 Key: FLINK-31755
 URL: https://issues.apache.org/jira/browse/FLINK-31755
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: Aitozi


Reproduce case:


{code:java}
create table row_sink (
  `b` ROW
) with (
  'connector' = 'values'
)

util.verifyRelPlanInsert(
"INSERT INTO row_sink " +
  "SELECT ROW(a, b) FROM complex_type_src intersect all " +
  "SELECT ROW(c, d) FROM complex_type_src ")

{code}

It will fails with 


{code:java}
Caused by: java.lang.IllegalArgumentException: Type mismatch:
rel rowtype: RecordType(RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER 
SET "UTF-16LE" EXPR$0, INTEGER EXPR$1) NOT NULL EXPR$0) NOT NULL
equiv rowtype: RecordType(RecordType(VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE" EXPR$0, INTEGER EXPR$1) NOT NULL EXPR$0) NOT NULL
Difference:
EXPR$0: RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
EXPR$0, INTEGER EXPR$1) NOT NULL -> RecordType(VARCHAR(2147483647) CHARACTER 
SET "UTF-16LE" EXPR$0, INTEGER EXPR$1) NOT NULL

at 
org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:592)
at 
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:613)
at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:144)
... 68 more
{code}


The reason is:

ROW function will generates the {{FULLY_QUALIFIED}} type. But after the 
{{RewriteIntersectAllRule}} optimization, it will produce the 
{{PEEK_FIELDS_NO_EXPAND}}. So the volcan planner complains with type mismatch.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-ml] lindong28 commented on pull request #230: [FLINK-31753] Support DataStream CoGroup in stream mode with similar performance as DataSet CoGroup

2023-04-07 Thread via GitHub


lindong28 commented on PR #230:
URL: https://github.com/apache/flink-ml/pull/230#issuecomment-1500262051

   @zhipeng93 Can you help review this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31754) Build flink master error with Error in ASM processing class org/apache/calcite/sql/validate/SqlValidatorImpl$NavigationExpander.class: 19

2023-04-07 Thread dragon (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31754?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dragon updated FLINK-31754:
---
Description: 
maven 3.25

jdk 1.8

scala 2.12

window 10

[E:\Project\flink\flink\flink-table\flink-table-planner]$ mvn package 
-DskipTests -e

 

[INFO] Error stacktraces are turned on.[INFO] Scanning for projects...[WARNING] 
[WARNING] Some problems were encountered while building the effective model for 
org.apache.flink:flink-table-planner_2.12:jar:1.18-SNAPSHOT
[WARNING] 'artifactId' contains an expression but should be a constant. @ 
org.apache.flink:flink-table-planner_${scala.binary.version}:[unknown-version], 
E:\Project\flink\flink\flink-table\flink-table-planner\pom.xml, line 29, column 
14
[WARNING] 
[WARNING] It is highly recommended to fix these problems because they threaten 
the stability of your build.
[WARNING] 
[WARNING] For this reason, future Maven versions might no longer support 
building such malformed projects.
[WARNING] 
[INFO]                                                                         
[INFO] 
[INFO] Building Flink : Table : Planner 1.18-SNAPSHOT
[INFO] 
[INFO] 
[INFO] — maven-checkstyle-plugin:3.1.2:check (validate) @ 
flink-table-planner_2.12 —
[WARNING] Old version of checkstyle detected. Consider updating to >= v8.30
[WARNING] For more information see: 
[https://maven.apache.org/plugins/maven-checkstyle-plugin/examples/upgrading-checkstyle.html]
[INFO] You have 0 Checkstyle violations.
[INFO] 
[INFO] — spotless-maven-plugin:2.27.1:check (spotless-check) @ 
flink-table-planner_2.12 —
[INFO] 
[INFO] — maven-enforcer-plugin:3.1.0:enforce (enforce-maven-version) @ 
flink-table-planner_2.12 —
[INFO] 
[INFO] — maven-enforcer-plugin:3.1.0:enforce (enforce-maven) @ 
flink-table-planner_2.12 —
[INFO] 
[INFO] — maven-enforcer-plugin:3.1.0:enforce (ban-unsafe-snakeyaml) @ 
flink-table-planner_2.12 —
[INFO] 
[INFO] — maven-enforcer-plugin:3.1.0:enforce (ban-unsafe-jackson) @ 
flink-table-planner_2.12 —
[INFO] 
[INFO] — maven-enforcer-plugin:3.1.0:enforce (forbid-log4j-1) @ 
flink-table-planner_2.12 —
[INFO] 
[INFO] — maven-enforcer-plugin:3.1.0:enforce 
(forbid-direct-akka-rpc-dependencies) @ flink-table-planner_2.12 —
[INFO] 
[INFO] — maven-enforcer-plugin:3.1.0:enforce 
(forbid-direct-table-planner-dependencies) @ flink-table-planner_2.12 —
[INFO] 
[INFO] — maven-enforcer-plugin:3.1.0:enforce (enforce-versions) @ 
flink-table-planner_2.12 —
[INFO] 
[INFO] — directory-maven-plugin:0.1:directory-of (directories) @ 
flink-table-planner_2.12 —
[INFO] Directory of org.apache.flink:flink-parent set to: E:\Project\flink\flink
[INFO] 
[INFO] — maven-remote-resources-plugin:1.5:process (process-resource-bundles) @ 
flink-table-planner_2.12 —
[INFO] 
[INFO] — maven-resources-plugin:3.1.0:resources (default-resources) @ 
flink-table-planner_2.12 —
[INFO] Using 'UTF-8' encoding to copy filtered resources.[INFO] Copying 1 
resource[INFO] Copying 3 resources[INFO] 
[INFO] — scala-maven-plugin:3.2.2:add-source (scala-compile-first) @ 
flink-table-planner_2.12 —
[INFO] Add Source directory: 
E:\Project\flink\flink\flink-table\flink-table-planner\src\main\scala
[INFO] Add Test Source directory: 
E:\Project\flink\flink\flink-table\flink-table-planner\src\test\scala
[INFO] 
[INFO] — scala-maven-plugin:3.2.2:compile (scala-compile-first) @ 
flink-table-planner_2.12 —
[INFO] Nothing to compile - all classes are up to date
[INFO] 
[INFO] — maven-compiler-plugin:3.8.0:compile (default-compile) @ 
flink-table-planner_2.12 —
[INFO] Nothing to compile - all classes are up to date
[INFO] 
[INFO] — maven-resources-plugin:3.1.0:testResources (default-testResources) @ 
flink-table-planner_2.12 —
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] Copying 371 resources[INFO] Copying 3 resources[INFO] 
[INFO] — scala-maven-plugin:3.2.2:testCompile (scala-test-compile) @ 
flink-table-planner_2.12 —
[INFO] Nothing to compile - all classes are up to date
[INFO] 
[INFO] — maven-compiler-plugin:3.8.0:testCompile (default-testCompile) @ 
flink-table-planner_2.12 —
[INFO] Nothing to compile - all classes are up to date
[INFO] 
[INFO] — maven-surefire-plugin:3.0.0-M5:test (default-test) @ 
flink-table-planner_2.12 —
[INFO] Tests are skipped.
[INFO] 
[INFO] — maven-jar-plugin:2.4:jar (default-jar) @ flink-table-planner_2.12 —
[INFO] Building jar: 
E:\Project\flink\flink\flink-table\flink-table-planner\target\flink-table-planner_2.12-1.18-SNAPSHOT.jar[INFO]
 
[INFO] — maven-jar-plugin:2.4:test-jar (default) @ flink-table-planner_2.12 —
[INFO] Building jar: 
E:\Project\flink\flink\flink-table\flink-table-planner\target\flink-table-planner_2.12-1.18-SNAPSHOT-tests.jar
[INFO] 
[INFO] — maven-shade-plugin:3.4.1:shade (shade-flink) @ 
flink-table-planner_2.12 —
[INFO] 

[jira] [Updated] (FLINK-31754) Build flink master error with Error in ASM processing class org/apache/calcite/sql/validate/SqlValidatorImpl$NavigationExpander.class: 19

2023-04-07 Thread dragon (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31754?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dragon updated FLINK-31754:
---
Description: 
maven 3.25

jdk 1.8

window 10

[E:\Project\flink\flink\flink-table\flink-table-planner]$ mvn package 
-DskipTests -e

 

[INFO] Error stacktraces are turned on.[INFO] Scanning for projects...[WARNING] 
[WARNING] Some problems were encountered while building the effective model for 
org.apache.flink:flink-table-planner_2.12:jar:1.18-SNAPSHOT
[WARNING] 'artifactId' contains an expression but should be a constant. @ 
org.apache.flink:flink-table-planner_${scala.binary.version}:[unknown-version], 
E:\Project\flink\flink\flink-table\flink-table-planner\pom.xml, line 29, column 
14
[WARNING] 
[WARNING] It is highly recommended to fix these problems because they threaten 
the stability of your build.
[WARNING] 
[WARNING] For this reason, future Maven versions might no longer support 
building such malformed projects.
[WARNING] 
[INFO]                                                                         
[INFO] 
[INFO] Building Flink : Table : Planner 1.18-SNAPSHOT
[INFO] 
[INFO] 
[INFO] — maven-checkstyle-plugin:3.1.2:check (validate) @ 
flink-table-planner_2.12 —
[WARNING] Old version of checkstyle detected. Consider updating to >= v8.30
[WARNING] For more information see: 
[https://maven.apache.org/plugins/maven-checkstyle-plugin/examples/upgrading-checkstyle.html]
[INFO] You have 0 Checkstyle violations.
[INFO] 
[INFO] — spotless-maven-plugin:2.27.1:check (spotless-check) @ 
flink-table-planner_2.12 —
[INFO] 
[INFO] — maven-enforcer-plugin:3.1.0:enforce (enforce-maven-version) @ 
flink-table-planner_2.12 —
[INFO] 
[INFO] — maven-enforcer-plugin:3.1.0:enforce (enforce-maven) @ 
flink-table-planner_2.12 —
[INFO] 
[INFO] — maven-enforcer-plugin:3.1.0:enforce (ban-unsafe-snakeyaml) @ 
flink-table-planner_2.12 —
[INFO] 
[INFO] — maven-enforcer-plugin:3.1.0:enforce (ban-unsafe-jackson) @ 
flink-table-planner_2.12 —
[INFO] 
[INFO] — maven-enforcer-plugin:3.1.0:enforce (forbid-log4j-1) @ 
flink-table-planner_2.12 —
[INFO] 
[INFO] — maven-enforcer-plugin:3.1.0:enforce 
(forbid-direct-akka-rpc-dependencies) @ flink-table-planner_2.12 —
[INFO] 
[INFO] — maven-enforcer-plugin:3.1.0:enforce 
(forbid-direct-table-planner-dependencies) @ flink-table-planner_2.12 —
[INFO] 
[INFO] — maven-enforcer-plugin:3.1.0:enforce (enforce-versions) @ 
flink-table-planner_2.12 —
[INFO] 
[INFO] — directory-maven-plugin:0.1:directory-of (directories) @ 
flink-table-planner_2.12 —
[INFO] Directory of org.apache.flink:flink-parent set to: E:\Project\flink\flink
[INFO] 
[INFO] — maven-remote-resources-plugin:1.5:process (process-resource-bundles) @ 
flink-table-planner_2.12 —
[INFO] 
[INFO] — maven-resources-plugin:3.1.0:resources (default-resources) @ 
flink-table-planner_2.12 —
[INFO] Using 'UTF-8' encoding to copy filtered resources.[INFO] Copying 1 
resource[INFO] Copying 3 resources[INFO] 
[INFO] — scala-maven-plugin:3.2.2:add-source (scala-compile-first) @ 
flink-table-planner_2.12 —
[INFO] Add Source directory: 
E:\Project\flink\flink\flink-table\flink-table-planner\src\main\scala
[INFO] Add Test Source directory: 
E:\Project\flink\flink\flink-table\flink-table-planner\src\test\scala
[INFO] 
[INFO] — scala-maven-plugin:3.2.2:compile (scala-compile-first) @ 
flink-table-planner_2.12 —
[INFO] Nothing to compile - all classes are up to date
[INFO] 
[INFO] — maven-compiler-plugin:3.8.0:compile (default-compile) @ 
flink-table-planner_2.12 —
[INFO] Nothing to compile - all classes are up to date
[INFO] 
[INFO] — maven-resources-plugin:3.1.0:testResources (default-testResources) @ 
flink-table-planner_2.12 —
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] Copying 371 resources[INFO] Copying 3 resources[INFO] 
[INFO] — scala-maven-plugin:3.2.2:testCompile (scala-test-compile) @ 
flink-table-planner_2.12 —
[INFO] Nothing to compile - all classes are up to date
[INFO] 
[INFO] — maven-compiler-plugin:3.8.0:testCompile (default-testCompile) @ 
flink-table-planner_2.12 —
[INFO] Nothing to compile - all classes are up to date
[INFO] 
[INFO] — maven-surefire-plugin:3.0.0-M5:test (default-test) @ 
flink-table-planner_2.12 —
[INFO] Tests are skipped.
[INFO] 
[INFO] — maven-jar-plugin:2.4:jar (default-jar) @ flink-table-planner_2.12 —
[INFO] Building jar: 
E:\Project\flink\flink\flink-table\flink-table-planner\target\flink-table-planner_2.12-1.18-SNAPSHOT.jar[INFO]
 
[INFO] — maven-jar-plugin:2.4:test-jar (default) @ flink-table-planner_2.12 —
[INFO] Building jar: 
E:\Project\flink\flink\flink-table\flink-table-planner\target\flink-table-planner_2.12-1.18-SNAPSHOT-tests.jar
[INFO] 
[INFO] — maven-shade-plugin:3.4.1:shade (shade-flink) @ 
flink-table-planner_2.12 —
[INFO] Including 

[jira] [Updated] (FLINK-31753) Support DataStream CoGroup in stream Mode with similar performance as DataSet CoGroup

2023-04-07 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-31753:
---
Labels: pull-request-available  (was: )

> Support DataStream CoGroup in stream Mode with similar performance as DataSet 
> CoGroup
> -
>
> Key: FLINK-31753
> URL: https://issues.apache.org/jira/browse/FLINK-31753
> Project: Flink
>  Issue Type: Bug
>  Components: Library / Machine Learning
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>  Labels: pull-request-available
> Fix For: ml-2.3.0
>
>
> DataSet has been deprecated and will be removed from Flink. However, 
> DataStream CoCroup is still considerably slower than DataSet when co-grouping 
> two bounded streams.
> Here are the benchmark results of co-grouping two bounded streams with 4*10^6 
> records from each stream under different modes. The co-group function is 
> chosen to be very lightweight so that benchmark is dominated by the Flink's 
> co-group overhead.
> DataSet: 5.6 sec
> DataStream batch mode: 15.4 sec
> DataStream stream mode with rocksdb: 81 sec
> We should be able to performance co-group operation in DataStream stream mode 
> so that users' don't have to take big regression in order to migrate from 
> DataSet to DataStream.
> We will first add util function in Flink ML to unblock the migration of some 
> algorithms from Alink to Flink ML.
> Here is the code used to benchmark DataSet's CoGroup.
> {code:java}
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> env.getConfig().enableObjectReuse();
> env.getConfig().disableGenericTypes();
> env.setRestartStrategy(RestartStrategies.noRestart());
> env.setParallelism(1);
> DataSet> data1 =
> env.fromCollection(
> new DataGenerator(numRecords),
> Types.TUPLE(Types.INT, Types.INT, Types.DOUBLE));
> DataSet> data2 =
> env.fromCollection(
> new DataGenerator(numRecords),
> Types.TUPLE(Types.INT, Types.INT, Types.DOUBLE));
> data1.coGroup(data2)
> .where((KeySelector, Integer>) tuple 
> -> tuple.f0)
> .equalTo((KeySelector, Integer>) 
> tuple -> tuple.f0)
> .with(
> new RichCoGroupFunction<
> Tuple3,
> Tuple3,
> Integer>() {
> @Override
> public void open(Configuration parameters) throws 
> Exception {
> super.open(parameters);
> }
> @Override
> public void close() throws Exception {
> super.close();
> }
> @Override
> public void coGroup(
> Iterable> 
> iterable,
> Iterable> 
> iterable1,
> Collector collector)
> throws Exception {
> collector.collect(1);
> }
> })
> .write(new CountingAndDiscardingSink(), "/tmp");
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-ml] lindong28 opened a new pull request, #230: [FLINK-31753] Support DataStream CoGroup in stream Mode with similar performance as DataSet CoGroup

2023-04-07 Thread via GitHub


lindong28 opened a new pull request, #230:
URL: https://github.com/apache/flink-ml/pull/230

   ## What is the purpose of the change
   
   Add util methods that allow algorithm developers to co-group two DataStreams 
with the same semantics and similar performance as `DataSet#coGroup(...)`
   
   Here are the results of running the benchmark specified in FLINK-31753's 
JIRA description:
   - DataSet#coGroup takes 27.6 seconds.
   - DataStreamUtils#coGroup takes  31.5 seconds.
   
   The DataStream is roughly 12.3% slower than DataSet. The performance 
difference should be negligible for real-word applications whose co-group 
function is non-trivial.
   
   ## Brief change log
   
   Added the static method `DataStreamUtils#coGroup(...)`.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? JavaDocs


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31754) Build flink master error with Error in ASM processing class org/apache/calcite/sql/validate/SqlValidatorImpl$NavigationExpander.class: 19

2023-04-07 Thread dragon (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31754?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dragon updated FLINK-31754:
---
Affects Version/s: 1.18.0

> Build flink master error with Error in ASM processing class 
> org/apache/calcite/sql/validate/SqlValidatorImpl$NavigationExpander.class: 19
> -
>
> Key: FLINK-31754
> URL: https://issues.apache.org/jira/browse/FLINK-31754
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: dragon
>Priority: Major
>
> maven 3.25
> jdk 1.8
> [E:\Project\flink\flink\flink-table\flink-table-planner]$ mvn package 
> -DskipTests -e
>  
> [INFO] Error stacktraces are turned on.[INFO] Scanning for 
> projects...[WARNING] 
> [WARNING] Some problems were encountered while building the effective model 
> for org.apache.flink:flink-table-planner_2.12:jar:1.18-SNAPSHOT
> [WARNING] 'artifactId' contains an expression but should be a constant. @ 
> org.apache.flink:flink-table-planner_${scala.binary.version}:[unknown-version],
>  E:\Project\flink\flink\flink-table\flink-table-planner\pom.xml, line 29, 
> column 14
> [WARNING] 
> [WARNING] It is highly recommended to fix these problems because they 
> threaten the stability of your build.
> [WARNING] 
> [WARNING] For this reason, future Maven versions might no longer support 
> building such malformed projects.
> [WARNING] 
> [INFO]                                                                        
>  
> [INFO] 
> 
> [INFO] Building Flink : Table : Planner 1.18-SNAPSHOT
> [INFO] 
> 
> [INFO] 
> [INFO] --- maven-checkstyle-plugin:3.1.2:check (validate) @ 
> flink-table-planner_2.12 ---
> [WARNING] Old version of checkstyle detected. Consider updating to >= v8.30
> [WARNING] For more information see: 
> https://maven.apache.org/plugins/maven-checkstyle-plugin/examples/upgrading-checkstyle.html
> [INFO] You have 0 Checkstyle violations.
> [INFO] 
> [INFO] --- spotless-maven-plugin:2.27.1:check (spotless-check) @ 
> flink-table-planner_2.12 ---
> [INFO] 
> [INFO] --- maven-enforcer-plugin:3.1.0:enforce (enforce-maven-version) @ 
> flink-table-planner_2.12 ---
> [INFO] 
> [INFO] --- maven-enforcer-plugin:3.1.0:enforce (enforce-maven) @ 
> flink-table-planner_2.12 ---
> [INFO] 
> [INFO] --- maven-enforcer-plugin:3.1.0:enforce (ban-unsafe-snakeyaml) @ 
> flink-table-planner_2.12 ---
> [INFO] 
> [INFO] --- maven-enforcer-plugin:3.1.0:enforce (ban-unsafe-jackson) @ 
> flink-table-planner_2.12 ---
> [INFO] 
> [INFO] --- maven-enforcer-plugin:3.1.0:enforce (forbid-log4j-1) @ 
> flink-table-planner_2.12 ---
> [INFO] 
> [INFO] --- maven-enforcer-plugin:3.1.0:enforce 
> (forbid-direct-akka-rpc-dependencies) @ flink-table-planner_2.12 ---
> [INFO] 
> [INFO] --- maven-enforcer-plugin:3.1.0:enforce 
> (forbid-direct-table-planner-dependencies) @ flink-table-planner_2.12 ---
> [INFO] 
> [INFO] --- maven-enforcer-plugin:3.1.0:enforce (enforce-versions) @ 
> flink-table-planner_2.12 ---
> [INFO] 
> [INFO] --- directory-maven-plugin:0.1:directory-of (directories) @ 
> flink-table-planner_2.12 ---
> [INFO] Directory of org.apache.flink:flink-parent set to: 
> E:\Project\flink\flink
> [INFO] 
> [INFO] --- maven-remote-resources-plugin:1.5:process 
> (process-resource-bundles) @ flink-table-planner_2.12 ---
> [INFO] 
> [INFO] --- maven-resources-plugin:3.1.0:resources (default-resources) @ 
> flink-table-planner_2.12 ---
> [INFO] Using 'UTF-8' encoding to copy filtered resources.[INFO] Copying 1 
> resource[INFO] Copying 3 resources[INFO] 
> [INFO] --- scala-maven-plugin:3.2.2:add-source (scala-compile-first) @ 
> flink-table-planner_2.12 ---
> [INFO] Add Source directory: 
> E:\Project\flink\flink\flink-table\flink-table-planner\src\main\scala
> [INFO] Add Test Source directory: 
> E:\Project\flink\flink\flink-table\flink-table-planner\src\test\scala
> [INFO] 
> [INFO] --- scala-maven-plugin:3.2.2:compile (scala-compile-first) @ 
> flink-table-planner_2.12 ---
> [INFO] Nothing to compile - all classes are up to date
> [INFO] 
> [INFO] --- maven-compiler-plugin:3.8.0:compile (default-compile) @ 
> flink-table-planner_2.12 ---
> [INFO] Nothing to compile - all classes are up to date
> [INFO] 
> [INFO] --- maven-resources-plugin:3.1.0:testResources (default-testResources) 
> @ flink-table-planner_2.12 ---
> [INFO] Using 'UTF-8' encoding to copy filtered resources.
> [INFO] Copying 371 resources[INFO] Copying 3 resources[INFO] 
> [INFO] --- scala-maven-plugin:3.2.2:testCompile (scala-test-compile) @ 
> flink-table-planner_2.12 ---
> [INFO] Nothing to compile - all classes are 

[GitHub] [flink] slfan1989 commented on a diff in pull request #22028: [FLINK-31230] Improve YarnClusterDescriptor memory unit display.

2023-04-07 Thread via GitHub


slfan1989 commented on code in PR #22028:
URL: https://github.com/apache/flink/pull/22028#discussion_r1160653403


##
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java:
##
@@ -1265,11 +1266,9 @@ private ApplicationReport startAppMaster(
 if (appState != lastAppState) {
 LOG.info("Deploying cluster, current state " + 
appState);
 }
-if (System.currentTimeMillis() - lastLogTime > 6) {
-lastLogTime = System.currentTimeMillis();
+if (System.currentTimeMillis() - startTime > 6) {
 LOG.info(
-"Deployment took more than {} seconds. Please 
check if the requested resources are available in the YARN cluster",
-(lastLogTime - startTime) / 1000);
+"Deployment took more than 60 seconds. Please 
check if the requested resources are available in the YARN cluster");

Review Comment:
   @1996fanrui @huwh Sorry, I didn't see the feedback question. When I rebase 
the master code later, I will be more careful.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-31603) Line break should be removed in create table with-clauses, load module with-clauses and table hints for both keys and values

2023-04-07 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17709664#comment-17709664
 ] 

Jark Wu edited comment on FLINK-31603 at 4/7/23 11:36 AM:
--

Hi [~paul8263], this is an expected behavior in SQL, because there is indeed a 
line break in the string literal. MySQL and other databases also have the same 
behavior. 

{code}
mysql> select 'abc
edf' as str;
+-+
| str |
+-+
| abc
edf |
+-+
1 row in set (0.01 sec)
{code}


was (Author: jark):
Hi [~paul8263], this is an expected behavior in SQL, because there is indeed a 
line break in the string literal. MySQL and other databases also have the same 
behavior. 

{code}
mysql> select 'abc
edf';
+-+
| abc
edf |
+-+
| abc
edf |
+-+
1 row in set (0.01 sec)
{code}

> Line break should be removed in create table with-clauses, load module 
> with-clauses and table hints for both keys and values
> 
>
> Key: FLINK-31603
> URL: https://issues.apache.org/jira/browse/FLINK-31603
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0, 1.16.1
> Environment: Flink 1.16.0
>Reporter: Yao Zhang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> Given a SQL like this:
> {code:sql}
> CREATE TABLE MyTable (
>   `user_id` BIGINT,
>   `name` STRING,
>   `timestamp` TIMESTAMP_LTZ(3) METADATA
> ) WITH (
>   'connector' = 'kaf
> ka'
>   ...
> );
> {code}
> After parsing the SQL, the option value 'connector' is 'kaf\nka', which will 
> lead to problems.
> The line break inside keys/values in with-clauses and table hints should be 
> removed when parsing SQLs.
> If this is the issue that needs to fix, I would like to do it, as I am 
> currently working on it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31603) Line break should be removed in create table with-clauses, load module with-clauses and table hints for both keys and values

2023-04-07 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17709664#comment-17709664
 ] 

Jark Wu commented on FLINK-31603:
-

Hi [~paul8263], this is an expected behavior in SQL, because there is indeed a 
line break in the string literal. MySQL and other databases also have the same 
behavior. 

{code}
mysql> select 'abc
edf';
+-+
| abc
edf |
+-+
| abc
edf |
+-+
1 row in set (0.01 sec)
{code}

> Line break should be removed in create table with-clauses, load module 
> with-clauses and table hints for both keys and values
> 
>
> Key: FLINK-31603
> URL: https://issues.apache.org/jira/browse/FLINK-31603
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0, 1.16.1
> Environment: Flink 1.16.0
>Reporter: Yao Zhang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> Given a SQL like this:
> {code:sql}
> CREATE TABLE MyTable (
>   `user_id` BIGINT,
>   `name` STRING,
>   `timestamp` TIMESTAMP_LTZ(3) METADATA
> ) WITH (
>   'connector' = 'kaf
> ka'
>   ...
> );
> {code}
> After parsing the SQL, the option value 'connector' is 'kaf\nka', which will 
> lead to problems.
> The line break inside keys/values in with-clauses and table hints should be 
> removed when parsing SQLs.
> If this is the issue that needs to fix, I would like to do it, as I am 
> currently working on it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31754) Build flink master error with Error in ASM processing class org/apache/calcite/sql/validate/SqlValidatorImpl$NavigationExpander.class: 19

2023-04-07 Thread zhenlong dong (Jira)
zhenlong dong created FLINK-31754:
-

 Summary: Build flink master error with Error in ASM processing 
class 
org/apache/calcite/sql/validate/SqlValidatorImpl$NavigationExpander.class: 19
 Key: FLINK-31754
 URL: https://issues.apache.org/jira/browse/FLINK-31754
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: zhenlong dong


maven 3.25

jdk 1.8

[E:\Project\flink\flink\flink-table\flink-table-planner]$ mvn package 
-DskipTests -e

 

[INFO] Error stacktraces are turned on.[INFO] Scanning for projects...[WARNING] 
[WARNING] Some problems were encountered while building the effective model for 
org.apache.flink:flink-table-planner_2.12:jar:1.18-SNAPSHOT
[WARNING] 'artifactId' contains an expression but should be a constant. @ 
org.apache.flink:flink-table-planner_${scala.binary.version}:[unknown-version], 
E:\Project\flink\flink\flink-table\flink-table-planner\pom.xml, line 29, column 
14
[WARNING] 
[WARNING] It is highly recommended to fix these problems because they threaten 
the stability of your build.
[WARNING] 
[WARNING] For this reason, future Maven versions might no longer support 
building such malformed projects.
[WARNING] 
[INFO]                                                                         
[INFO] 
[INFO] Building Flink : Table : Planner 1.18-SNAPSHOT
[INFO] 
[INFO] 
[INFO] --- maven-checkstyle-plugin:3.1.2:check (validate) @ 
flink-table-planner_2.12 ---
[WARNING] Old version of checkstyle detected. Consider updating to >= v8.30
[WARNING] For more information see: 
https://maven.apache.org/plugins/maven-checkstyle-plugin/examples/upgrading-checkstyle.html
[INFO] You have 0 Checkstyle violations.
[INFO] 
[INFO] --- spotless-maven-plugin:2.27.1:check (spotless-check) @ 
flink-table-planner_2.12 ---
[INFO] 
[INFO] --- maven-enforcer-plugin:3.1.0:enforce (enforce-maven-version) @ 
flink-table-planner_2.12 ---
[INFO] 
[INFO] --- maven-enforcer-plugin:3.1.0:enforce (enforce-maven) @ 
flink-table-planner_2.12 ---
[INFO] 
[INFO] --- maven-enforcer-plugin:3.1.0:enforce (ban-unsafe-snakeyaml) @ 
flink-table-planner_2.12 ---
[INFO] 
[INFO] --- maven-enforcer-plugin:3.1.0:enforce (ban-unsafe-jackson) @ 
flink-table-planner_2.12 ---
[INFO] 
[INFO] --- maven-enforcer-plugin:3.1.0:enforce (forbid-log4j-1) @ 
flink-table-planner_2.12 ---
[INFO] 
[INFO] --- maven-enforcer-plugin:3.1.0:enforce 
(forbid-direct-akka-rpc-dependencies) @ flink-table-planner_2.12 ---
[INFO] 
[INFO] --- maven-enforcer-plugin:3.1.0:enforce 
(forbid-direct-table-planner-dependencies) @ flink-table-planner_2.12 ---
[INFO] 
[INFO] --- maven-enforcer-plugin:3.1.0:enforce (enforce-versions) @ 
flink-table-planner_2.12 ---
[INFO] 
[INFO] --- directory-maven-plugin:0.1:directory-of (directories) @ 
flink-table-planner_2.12 ---
[INFO] Directory of org.apache.flink:flink-parent set to: E:\Project\flink\flink
[INFO] 
[INFO] --- maven-remote-resources-plugin:1.5:process (process-resource-bundles) 
@ flink-table-planner_2.12 ---
[INFO] 
[INFO] --- maven-resources-plugin:3.1.0:resources (default-resources) @ 
flink-table-planner_2.12 ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.[INFO] Copying 1 
resource[INFO] Copying 3 resources[INFO] 
[INFO] --- scala-maven-plugin:3.2.2:add-source (scala-compile-first) @ 
flink-table-planner_2.12 ---
[INFO] Add Source directory: 
E:\Project\flink\flink\flink-table\flink-table-planner\src\main\scala
[INFO] Add Test Source directory: 
E:\Project\flink\flink\flink-table\flink-table-planner\src\test\scala
[INFO] 
[INFO] --- scala-maven-plugin:3.2.2:compile (scala-compile-first) @ 
flink-table-planner_2.12 ---
[INFO] Nothing to compile - all classes are up to date
[INFO] 
[INFO] --- maven-compiler-plugin:3.8.0:compile (default-compile) @ 
flink-table-planner_2.12 ---
[INFO] Nothing to compile - all classes are up to date
[INFO] 
[INFO] --- maven-resources-plugin:3.1.0:testResources (default-testResources) @ 
flink-table-planner_2.12 ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] Copying 371 resources[INFO] Copying 3 resources[INFO] 
[INFO] --- scala-maven-plugin:3.2.2:testCompile (scala-test-compile) @ 
flink-table-planner_2.12 ---
[INFO] Nothing to compile - all classes are up to date
[INFO] 
[INFO] --- maven-compiler-plugin:3.8.0:testCompile (default-testCompile) @ 
flink-table-planner_2.12 ---
[INFO] Nothing to compile - all classes are up to date
[INFO] 
[INFO] --- maven-surefire-plugin:3.0.0-M5:test (default-test) @ 
flink-table-planner_2.12 ---
[INFO] Tests are skipped.
[INFO] 
[INFO] --- maven-jar-plugin:2.4:jar (default-jar) @ flink-table-planner_2.12 ---
[INFO] Building jar: 

[GitHub] [flink] slfan1989 commented on pull request #22207: [FLINK-31510][yarn] Use getMemorySize instead of getMemory.

2023-04-07 Thread via GitHub


slfan1989 commented on PR #22207:
URL: https://github.com/apache/flink/pull/22207#issuecomment-1500183076

   @reswqa Can you help review this PR again? Thank you very much! I explained 
the reason for possible negative numbers.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] huwh commented on a diff in pull request #21527: [FLINK-27925] [kubernetes]Performance optimization when watch tm pod and list pod.

2023-04-07 Thread via GitHub


huwh commented on code in PR #21527:
URL: https://github.com/apache/flink/pull/21527#discussion_r1160572180


##
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java:
##
@@ -191,7 +192,12 @@ public Optional getRestEndpoint(String 
clusterId) {
 
 @Override
 public List getPodsWithLabels(Map labels) {
-final List podList = 
this.internalClient.pods().withLabels(labels).list().getItems();
+final List podList =
+this.internalClient
+.pods()
+.withLabels(labels)
+.list(new 
ListOptionsBuilder().withResourceVersion("0").build())

Review Comment:
   This "0" could be extracted as a static final variable



##
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java:
##
@@ -143,6 +147,34 @@ protected void mockGetDeploymentWithError() {
 server.expect().get().withPath(path).andReturn(500, "Expected 
error").always();
 }
 
+protected void mockPodEventWithLabels(Map labels) {
+final Pod pod1 =

Review Comment:
   maybe "pod" is enough, since there is only one pod here



##
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java:
##
@@ -143,6 +147,34 @@ protected void mockGetDeploymentWithError() {
 server.expect().get().withPath(path).andReturn(500, "Expected 
error").always();
 }
 
+protected void mockPodEventWithLabels(Map labels) {

Review Comment:
   It's better to move the namespace, name, resource version to function 
arguments.
   



##
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java:
##
@@ -277,6 +281,23 @@ void testStopPod() throws ExecutionException, 
InterruptedException {
 
assertThat(this.kubeClient.pods().inNamespace(NAMESPACE).withName(podName).get()).isNull();
 }
 
+@Test
+void testGetPodsWithLabels() {
+final String podName = "pod-with-labels";
+final Pod pod =
+new PodBuilder()
+.editOrNewMetadata()
+.withName(podName)
+.withLabels(TESTING_LABELS)
+.endMetadata()
+.editOrNewSpec()
+.endSpec()
+.build();
+this.kubeClient.pods().inNamespace(NAMESPACE).create(pod);
+List kubernetesPods = 
this.flinkKubeClient.getPodsWithLabels(TESTING_LABELS);
+assertThat(kubernetesPods.size()).isEqualTo(1);

Review Comment:
   Should check the list pod is excepted one
   
   assertThat(kubernetesPods)
   .satisfiesExactly(
   kubernetesPod -> 
assertThat(kubernetesPod.getName()).isEqualTo(podName));



##
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java:
##
@@ -411,6 +432,16 @@ void testStopAndCleanupCluster() throws Exception {
 .isEmpty();
 }
 
+@Test
+void testWatchPodsAndDoCallback() throws Exception {
+mockPodEventWithLabels(TESTING_LABELS);
+// the count latch for events.
+final CountDownLatch eventLatch = new CountDownLatch(4);
+this.flinkKubeClient.watchPodsAndDoCallback(

Review Comment:
   It's better to check each event is received.
   
   Maybe you can use three CountDownLatch and a anonymous classes here 



##
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java:
##
@@ -630,4 +661,39 @@ private KubernetesConfigMap buildTestingConfigMap() {
 .withData(data)
 .build());
 }
+
+private class TestingKubernetesPodCallbackHandler
+implements FlinkKubeClient.WatchCallbackHandler {
+
+private final CountDownLatch eventLatch;
+
+public TestingKubernetesPodCallbackHandler(CountDownLatch eventLatch) {
+this.eventLatch = eventLatch;
+}
+
+@Override
+public void onAdded(List resources) {
+this.eventLatch.countDown();
+}
+
+@Override
+public void onModified(List resources) {
+this.eventLatch.countDown();
+}
+
+@Override
+public void onDeleted(List resources) {
+this.eventLatch.countDown();
+}
+
+@Override
+public void onError(List resources) {

Review Comment:
   ERROR event will not trigger Watcher#eventReceived, so this unit test will 
failed.
   
   Maybe we can just skip this in unit test.
   
   ref: 
https://github.com/fabric8io/kubernetes-client/blob/master/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java#L325



##

[GitHub] [flink] pltbkd commented on a diff in pull request #22291: [FLINK-31632] Fix maxAllowedWatermark arithmetic overflow when the source is idle

2023-04-07 Thread via GitHub


pltbkd commented on code in PR #22291:
URL: https://github.com/apache/flink/pull/22291#discussion_r1160621805


##
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java:
##
@@ -178,9 +178,16 @@ void announceCombinedWatermark() {
 
aggregator.getAggregatedWatermark().getTimestamp());
 });
 
-long maxAllowedWatermark =
-globalCombinedWatermark.getTimestamp()
-+ 
watermarkAlignmentParams.getMaxAllowedWatermarkDrift();
+long maxAllowedWatermark;
+try {
+maxAllowedWatermark =

Review Comment:
   Thanks for the update! LGTM now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31753) Support DataStream CoGroup in stream Mode with similar performance as DataSet CoGroup

2023-04-07 Thread Dong Lin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated FLINK-31753:
-
Description: 
DataSet has been deprecated and will be removed from Flink. However, DataStream 
CoCroup is still considerably slower than DataSet when co-grouping two bounded 
streams.

Here are the benchmark results of co-grouping two bounded streams with 4*10^6 
records from each stream under different modes. The co-group function is chosen 
to be very lightweight so that benchmark is dominated by the Flink's co-group 
overhead.

DataSet: 5.6 sec
DataStream batch mode: 15.4 sec
DataStream stream mode with rocksdb: 81 sec

We should be able to performance co-group operation in DataStream stream mode 
so that users' don't have to take big regression in order to migrate from 
DataSet to DataStream.

We will first add util function in Flink ML to unblock the migration of some 
algorithms from Alink to Flink ML.

Here is the code used to benchmark DataSet's CoGroup.

{code:java}
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse();
env.getConfig().disableGenericTypes();
env.setRestartStrategy(RestartStrategies.noRestart());
env.setParallelism(1);

DataSet> data1 =
env.fromCollection(
new DataGenerator(numRecords),
Types.TUPLE(Types.INT, Types.INT, Types.DOUBLE));
DataSet> data2 =
env.fromCollection(
new DataGenerator(numRecords),
Types.TUPLE(Types.INT, Types.INT, Types.DOUBLE));

data1.coGroup(data2)
.where((KeySelector, Integer>) tuple 
-> tuple.f0)
.equalTo((KeySelector, Integer>) tuple 
-> tuple.f0)
.with(
new RichCoGroupFunction<
Tuple3,
Tuple3,
Integer>() {

@Override
public void open(Configuration parameters) throws Exception 
{
super.open(parameters);
}

@Override
public void close() throws Exception {
super.close();
}

@Override
public void coGroup(
Iterable> iterable,
Iterable> 
iterable1,
Collector collector)
throws Exception {
collector.collect(1);
}
})
.write(new CountingAndDiscardingSink(), "/tmp");
{code}



  was:
DataSet has been deprecated and will be removed from Flink. However, DataStream 
CoCroup is still considerably slower than DataSet when co-grouping two bounded 
streams.

Here is the benchmark result of co-grouping two bounded stream with 4*10^6 
records from each stream. The co-group function is chosen to be very 
lightweight so that we only 







> Support DataStream CoGroup in stream Mode with similar performance as DataSet 
> CoGroup
> -
>
> Key: FLINK-31753
> URL: https://issues.apache.org/jira/browse/FLINK-31753
> Project: Flink
>  Issue Type: Bug
>  Components: Library / Machine Learning
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
> Fix For: ml-2.3.0
>
>
> DataSet has been deprecated and will be removed from Flink. However, 
> DataStream CoCroup is still considerably slower than DataSet when co-grouping 
> two bounded streams.
> Here are the benchmark results of co-grouping two bounded streams with 4*10^6 
> records from each stream under different modes. The co-group function is 
> chosen to be very lightweight so that benchmark is dominated by the Flink's 
> co-group overhead.
> DataSet: 5.6 sec
> DataStream batch mode: 15.4 sec
> DataStream stream mode with rocksdb: 81 sec
> We should be able to performance co-group operation in DataStream stream mode 
> so that users' don't have to take big regression in order to migrate from 
> DataSet to DataStream.
> We will first add util function in Flink ML to unblock the migration of some 
> algorithms from Alink to Flink ML.
> Here is the code used to benchmark DataSet's CoGroup.
> {code:java}
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> env.getConfig().enableObjectReuse();
> env.getConfig().disableGenericTypes();
> env.setRestartStrategy(RestartStrategies.noRestart());
> env.setParallelism(1);
> DataSet> data1 =
> env.fromCollection(
> new DataGenerator(numRecords),
> Types.TUPLE(Types.INT, Types.INT, Types.DOUBLE));
> DataSet> data2 =
> env.fromCollection(
> new DataGenerator(numRecords),
> 

[jira] [Updated] (FLINK-31753) Support DataStream CoGroup in stream Mode with similar performance as DataSet CoGroup

2023-04-07 Thread Dong Lin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated FLINK-31753:
-
Description: 
DataSet has been deprecated and will be removed from Flink. However, DataStream 
CoCroup is still considerably slower than DataSet when co-grouping two bounded 
streams.

Here is the benchmark result of co-grouping two bounded stream with 4*10^6 
records from each stream. The co-group function is chosen to be very 
lightweight so that we only 






> Support DataStream CoGroup in stream Mode with similar performance as DataSet 
> CoGroup
> -
>
> Key: FLINK-31753
> URL: https://issues.apache.org/jira/browse/FLINK-31753
> Project: Flink
>  Issue Type: Bug
>  Components: Library / Machine Learning
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
> Fix For: ml-2.3.0
>
>
> DataSet has been deprecated and will be removed from Flink. However, 
> DataStream CoCroup is still considerably slower than DataSet when co-grouping 
> two bounded streams.
> Here is the benchmark result of co-grouping two bounded stream with 4*10^6 
> records from each stream. The co-group function is chosen to be very 
> lightweight so that we only 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31753) Support DataStream CoGroup in stream Mode with similar performance as DataSet CoGroup

2023-04-07 Thread Dong Lin (Jira)
Dong Lin created FLINK-31753:


 Summary: Support DataStream CoGroup in stream Mode with similar 
performance as DataSet CoGroup
 Key: FLINK-31753
 URL: https://issues.apache.org/jira/browse/FLINK-31753
 Project: Flink
  Issue Type: Bug
  Components: Library / Machine Learning
Reporter: Dong Lin
Assignee: Dong Lin
 Fix For: ml-2.3.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] gj199575 commented on pull request #22103: [FLINK-28825] Add K8S pod scheduler into Kubernetes options [FLINK-28829] Support prepreparing K8S resources before JM creation [FLINK-28831

2023-04-07 Thread via GitHub


gj199575 commented on PR #22103:
URL: https://github.com/apache/flink/pull/22103#issuecomment-1500120459

   > We are still using flink1.12. What should I do to use this pr in 1.12 or 
1.15? I tried to merge this pr in 1.12 and found that the difference is too big 
and it is difficult to merge. If you can, I hope you can guide me. Thanks
   
   ok. and I think it is also simple to  use this PR with flink 1.12.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-31752) SourceOperatorStreamTask increments numRecordsOut twice

2023-04-07 Thread Dong Lin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17709633#comment-17709633
 ] 

Dong Lin commented on FLINK-31752:
--

[~huwh] Thanks for reporting this bug. I will look into this.

> SourceOperatorStreamTask increments numRecordsOut twice
> ---
>
> Key: FLINK-31752
> URL: https://issues.apache.org/jira/browse/FLINK-31752
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Metrics
>Affects Versions: 1.17.0
>Reporter: Weihua Hu
>Assignee: Dong Lin
>Priority: Major
> Attachments: image-2023-04-07-15-51-44-304.png
>
>
> The counter of numRecordsOut was introduce to ChainingOutput to reduce the 
> function call stack depth in 
> https://issues.apache.org/jira/browse/FLINK-30536
> But SourceOperatorStreamTask.AsyncDataOutputToOutput increments the counter 
> of numRecordsOut too. This results in the source operator's numRecordsOut are 
> doubled.
> We should delete the numRecordsOut.inc in 
> SourceOperatorStreamTask.AsyncDataOutputToOutput.
> [~xtsong][~lindong] Could you please take a look at this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-31752) SourceOperatorStreamTask increments numRecordsOut twice

2023-04-07 Thread Dong Lin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31752?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin reassigned FLINK-31752:


Assignee: Dong Lin

> SourceOperatorStreamTask increments numRecordsOut twice
> ---
>
> Key: FLINK-31752
> URL: https://issues.apache.org/jira/browse/FLINK-31752
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Metrics
>Affects Versions: 1.17.0
>Reporter: Weihua Hu
>Assignee: Dong Lin
>Priority: Major
> Attachments: image-2023-04-07-15-51-44-304.png
>
>
> The counter of numRecordsOut was introduce to ChainingOutput to reduce the 
> function call stack depth in 
> https://issues.apache.org/jira/browse/FLINK-30536
> But SourceOperatorStreamTask.AsyncDataOutputToOutput increments the counter 
> of numRecordsOut too. This results in the source operator's numRecordsOut are 
> doubled.
> We should delete the numRecordsOut.inc in 
> SourceOperatorStreamTask.AsyncDataOutputToOutput.
> [~xtsong][~lindong] Could you please take a look at this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-31726) PyFlink module java.base does not "opens java.lang" to unnamed module

2023-04-07 Thread Xingbo Huang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31726?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingbo Huang closed FLINK-31726.

Resolution: Not A Problem

Flink doesn't support JDK 19, you can try to use jdk 8 or jdk 11.

> PyFlink module java.base does not "opens java.lang" to unnamed module
> -
>
> Key: FLINK-31726
> URL: https://issues.apache.org/jira/browse/FLINK-31726
> Project: Flink
>  Issue Type: Bug
>Reporter: padavan
>Priority: Major
>
> I want to run simple example from Flink documentation. And after start i got 
> exception:
> {code:java}
> Unable to make field private final byte[] java.lang.String.value accessible: 
> module java.base does not "opens java.lang" to unnamed module @228575c0{code}
> Installed:
> {code:java}
> Python 3.10.6
> openjdk version "19.0.2" 2023-01-17 
> OpenJDK Runtime Environment (build 19.0.2+7-Ubuntu-0ubuntu322.04) 
> OpenJDK 64-Bit Server VM (build 19.0.2+7-Ubuntu-0ubuntu322.04, mixed mode, 
> sharing){code}
> Simple code from flink site:
> [https://nightlies.apache.org/flink/flink-docs-master/api/python/examples/datastream/word_count.html]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30989) Configuration table.exec.spill-compression.block-size not take effect in batch job

2023-04-07 Thread dalongliu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17709623#comment-17709623
 ] 

dalongliu commented on FLINK-30989:
---

[~Weijie Guo] Yes, we need to pick it back to release-1.16.

> Configuration table.exec.spill-compression.block-size not take effect in 
> batch job
> --
>
> Key: FLINK-30989
> URL: https://issues.apache.org/jira/browse/FLINK-30989
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Configuration, Table SQL / Runtime
>Affects Versions: 1.16.1
>Reporter: shen
>Assignee: dalongliu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.2, 1.18.0, 1.17.1
>
> Attachments: image-2023-02-09-19-37-44-927.png
>
>
> h1. Description
> I tried to config table.exec.spill-compression.block-size in TableEnv in my 
> job and failed. I  attached to TaskManager and found conf passed to 
> constructor of 
> [BinaryExternalSorter|https://github.com/apache/flink/blob/release-1.16.1/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalSorter.java#L204]
>  is empty:
> !image-2023-02-09-19-37-44-927.png|width=306,height=185!
> h1. How to reproduce
> A simple code to reproduce this problem:
> {code:java}
> // App.java
> package test.flink403;
> import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
> import static 
> org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE;
> import org.apache.flink.api.common.RuntimeExecutionMode;
> import org.apache.flink.configuration.AlgorithmOptions;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import org.apache.flink.table.api.config.ExecutionConfigOptions;
> import java.util.Arrays; public class App {
>   public static void main(String argc[]) throws Exception {
> Configuration config = new Configuration();
> config.set(RUNTIME_MODE, RuntimeExecutionMode.BATCH);
> config.set(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED, 
> true);
> config.set(AlgorithmOptions.HASH_JOIN_BLOOM_FILTERS, true);
> config.setString(TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.key(), "32 m"); 
> // < cannot take effect
> config.set(AlgorithmOptions.SORT_SPILLING_THRESHOLD, Float.valueOf(0.5f));
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.createLocalEnvironment(1, config);
> final StreamTableEnvironment tableEnv = 
> StreamTableEnvironment.create(env);
> tableEnv.getConfig().set("table.exec.spill-compression.block-size", "32 
> m"); // < cannot take effect
> final DataStream orderA =
> env.fromCollection(
> Arrays.asList(
> new Order(1L, "beer", 3),
> new Order(1L, "diaper", 4),
> new Order(3L, "rubber", 2)));
> final Table tableA = tableEnv.fromDataStream(orderA);
> final Table result =
> tableEnv.sqlQuery(
> "SELECT * FROM "
> + tableA
> + " "
> + " order by user");
> tableEnv.toDataStream(result, Order.class).print();
> env.execute();
>   }
> }
> // ---
> // Order.java
> package test.flink403;
> public class Order {
>   public Long user;
>   public String product;
>   public int amount;
>   // for POJO detection in DataStream API
>   public Order() {}
>   // for structured type detection in Table API
>   public Order(Long user, String product, int amount) {
> this.user = user;
> this.product = product;
> this.amount = amount;
>   }
>   @Override
>   public String toString() {
> return "Order{"
> + "user="
> + user
> + ", product='"
> + product
> + '\''
> + ", amount="
> + amount
> + '}';
>   }
> }{code}
>  
> I think it is because 
> [SortOperator|https://github.com/apache/flink/blob/release-1.16.1/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/SortOperator.java#L88]
>  try to get conf from JobConfiguration, which should be set in JobGraph. 
> Following are the Classes use the same method to get conf from 
> JobConfiguration:
>  * BinaryExternalSorter
>  ** ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED
>  ** ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES
>  ** ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED
>  ** 

[jira] [Updated] (FLINK-31752) SourceOperatorStreamTask increments numRecordsOut twice

2023-04-07 Thread Weihua Hu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31752?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weihua Hu updated FLINK-31752:
--
Description: 
The counter of numRecordsOut was introduce to ChainingOutput to reduce the 
function call stack depth in 
https://issues.apache.org/jira/browse/FLINK-30536

But SourceOperatorStreamTask.AsyncDataOutputToOutput increments the counter of 
numRecordsOut too. This results in the source operator's numRecordsOut are 
doubled.

We should delete the numRecordsOut.inc in 
SourceOperatorStreamTask.AsyncDataOutputToOutput.

[~xtsong][~lindong] Could you please take a look at this.


  was:
The counter of numRecordsOut was introduce to ChainingOutput to reduce the 
function call stack depth in 
https://issues.apache.org/jira/browse/FLINK-30536

But SourceOperatorStreamTask.AsyncDataOutputToOutput increments the counter of 
numRecordsOut too. This results in the source operator's numRecordsOut are 
doubled.

We should delete the numRecordsOut.inc in 
SourceOperatorStreamTask.AsyncDataOutputToOutput



> SourceOperatorStreamTask increments numRecordsOut twice
> ---
>
> Key: FLINK-31752
> URL: https://issues.apache.org/jira/browse/FLINK-31752
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Metrics
>Affects Versions: 1.17.0
>Reporter: Weihua Hu
>Priority: Major
> Attachments: image-2023-04-07-15-51-44-304.png
>
>
> The counter of numRecordsOut was introduce to ChainingOutput to reduce the 
> function call stack depth in 
> https://issues.apache.org/jira/browse/FLINK-30536
> But SourceOperatorStreamTask.AsyncDataOutputToOutput increments the counter 
> of numRecordsOut too. This results in the source operator's numRecordsOut are 
> doubled.
> We should delete the numRecordsOut.inc in 
> SourceOperatorStreamTask.AsyncDataOutputToOutput.
> [~xtsong][~lindong] Could you please take a look at this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31752) SourceOperatorStreamTask increments numRecordsOut twice

2023-04-07 Thread Weihua Hu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31752?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weihua Hu updated FLINK-31752:
--
Description: 
The counter of numRecordsOut was introduce to ChainingOutput to reduce the 
function call stack depth in 
https://issues.apache.org/jira/browse/FLINK-30536

But SourceOperatorStreamTask.AsyncDataOutputToOutput increments the counter of 
numRecordsOut too. This results in the source operator's numRecordsOut are 
doubled.

We should delete the numRecordsOut.inc in 
SourceOperatorStreamTask.AsyncDataOutputToOutput


  was:
The counter of numRecordsOut was introduce to ChainingOutput to reduce the 
function call stack depth in 
https://issues.apache.org/jira/browse/FLINK-30536

But SourceOperatorStreamTask.AsyncDataOutputToOutput increments the counter of 
numRecordsOut too. This results in the source operator's numRecordsOut are 
doubled.

We should delete the numRecordsOut.inc in 
SourceOperatorStreamTask.AsyncDataOutputToOutput

 !image-2023-04-07-15-51-44-304.png! 


> SourceOperatorStreamTask increments numRecordsOut twice
> ---
>
> Key: FLINK-31752
> URL: https://issues.apache.org/jira/browse/FLINK-31752
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Metrics
>Affects Versions: 1.17.0
>Reporter: Weihua Hu
>Priority: Major
> Attachments: image-2023-04-07-15-51-44-304.png
>
>
> The counter of numRecordsOut was introduce to ChainingOutput to reduce the 
> function call stack depth in 
> https://issues.apache.org/jira/browse/FLINK-30536
> But SourceOperatorStreamTask.AsyncDataOutputToOutput increments the counter 
> of numRecordsOut too. This results in the source operator's numRecordsOut are 
> doubled.
> We should delete the numRecordsOut.inc in 
> SourceOperatorStreamTask.AsyncDataOutputToOutput



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31752) SourceOperatorStreamTask increments numRecordsOut twice

2023-04-07 Thread Weihua Hu (Jira)
Weihua Hu created FLINK-31752:
-

 Summary: SourceOperatorStreamTask increments numRecordsOut twice
 Key: FLINK-31752
 URL: https://issues.apache.org/jira/browse/FLINK-31752
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Metrics
Affects Versions: 1.17.0
Reporter: Weihua Hu
 Attachments: image-2023-04-07-15-51-44-304.png

The counter of numRecordsOut was introduce to ChainingOutput to reduce the 
function call stack depth in 
https://issues.apache.org/jira/browse/FLINK-30536

But SourceOperatorStreamTask.AsyncDataOutputToOutput increments the counter of 
numRecordsOut too. This results in the source operator's numRecordsOut are 
doubled.

We should delete the numRecordsOut.inc in 
SourceOperatorStreamTask.AsyncDataOutputToOutput

 !image-2023-04-07-15-51-44-304.png! 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31751) array return type SpecificTypeStrategies.ARRAY and ifThenElse return type is not correct

2023-04-07 Thread jackylau (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17709612#comment-17709612
 ] 

jackylau commented on FLINK-31751:
--

hi [~twalthr] [~jark] what do you think?

> array return type SpecificTypeStrategies.ARRAY and ifThenElse return type is 
> not correct
> 
>
> Key: FLINK-31751
> URL: https://issues.apache.org/jira/browse/FLINK-31751
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: jackylau
>Priority: Major
> Fix For: 1.18.0
>
>
> like array return type
> Type strategy that returns a \{@link DataTypes#ARRAY(DataType)} with element 
> type equal to the type of the first argument, which is not equals calcite 
> semantic.
> for example
> {code:java}
> ARRAY and ARRAY NOT NULL
> it should return  ARRAY instead of ARRAY{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31751) array return type SpecificTypeStrategies.ARRAY and ifThenElse return type is not correct

2023-04-07 Thread jackylau (Jira)
jackylau created FLINK-31751:


 Summary: array return type SpecificTypeStrategies.ARRAY and 
ifThenElse return type is not correct
 Key: FLINK-31751
 URL: https://issues.apache.org/jira/browse/FLINK-31751
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Affects Versions: 1.18.0
Reporter: jackylau
 Fix For: 1.18.0


like array return type

Type strategy that returns a \{@link DataTypes#ARRAY(DataType)} with element 
type equal to the type of the first argument, which is not equals calcite 
semantic.

for example
{code:java}
ARRAY and ARRAY NOT NULL
it should return  ARRAY instead of ARRAY{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] senlizishi closed pull request #19800: [docs]update User-defined Sources & Sinks document

2023-04-07 Thread via GitHub


senlizishi closed pull request #19800:  [docs]update User-defined Sources & 
Sinks document
URL: https://github.com/apache/flink/pull/19800


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] KinoMin commented on pull request #22103: [FLINK-28825] Add K8S pod scheduler into Kubernetes options [FLINK-28829] Support prepreparing K8S resources before JM creation [FLINK-28831]

2023-04-07 Thread via GitHub


KinoMin commented on PR #22103:
URL: https://github.com/apache/flink/pull/22103#issuecomment-1499971995

   We are still using flink1.12, what should I do for this pr I want to use in 
1.12, I try to merge this pr in 1.12, and find that the difference is too big 
and it is difficult to merge, if you can, I hope you can guide me.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org