Re: [PR] [FLINK-33470] Implement restore tests for Join node [flink]

2023-12-06 Thread via GitHub


dawidwys closed pull request #23869: [FLINK-33470] Implement restore tests for 
Join node
URL: https://github.com/apache/flink/pull/23869


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33470) Implement restore tests for Join node

2023-12-06 Thread Dawid Wysakowicz (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17793565#comment-17793565
 ] 

Dawid Wysakowicz commented on FLINK-33470:
--

Fixed in 
2c7f2d4bca25f923f591b20ed7fb93a40acfc681..78b0a625f49058d0c0213de49d6337a7d4f2ab72

> Implement restore tests for Join node
> -
>
> Key: FLINK-33470
> URL: https://issues.apache.org/jira/browse/FLINK-33470
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Jim Hughes
>Assignee: Jim Hughes
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33667] Implement restore tests for MatchRecognize node [flink]

2023-12-06 Thread via GitHub


dawidwys merged PR #23821:
URL: https://github.com/apache/flink/pull/23821


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-33667) Implement restore tests for MatchRecognize node

2023-12-06 Thread Dawid Wysakowicz (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33667?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz closed FLINK-33667.

Resolution: Implemented

Implemented in 60cc00e5e6abc0b7309a48a37e171dae9fa98183

> Implement restore tests for MatchRecognize node
> ---
>
> Key: FLINK-33667
> URL: https://issues.apache.org/jira/browse/FLINK-33667
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Jim Hughes
>Assignee: Jim Hughes
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33266][sql-gateway] Support plan cache for DQL in SQL Gateway [flink]

2023-12-06 Thread via GitHub


zoudan commented on code in PR #23849:
URL: https://github.com/apache/flink/pull/23849#discussion_r1416945039


##
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceStatementITCase.java:
##
@@ -52,18 +59,49 @@
 /** Test {@link SqlGatewayService}#executeStatement. */
 public class SqlGatewayServiceStatementITCase extends 
AbstractSqlGatewayStatementITCase {
 
-private final SessionEnvironment defaultSessionEnvironment =
+private static final SessionEnvironment DEFAULT_SESSION_ENVIRONMENT =
+SessionEnvironment.newBuilder()
+.setSessionEndpointVersion(MockedEndpointVersion.V1)
+.build();
+
+private static final SessionEnvironment 
SESSION_ENVIRONMENT_WITH_PLAN_CACHE_ENABLED =
 SessionEnvironment.newBuilder()
 .setSessionEndpointVersion(MockedEndpointVersion.V1)
+.addSessionConfig(
+Collections.singletonMap(
+
SQL_GATEWAY_SESSION_PLAN_CACHE_ENABLED.key(), "true"))
 .build();
 
 private SessionHandle sessionHandle;
 
+@Parameters(name = "parameters={0}")
+public static List parameters() throws Exception {
+return listFlinkSqlTests().stream()
+.map(path -> new StatementTestParameters(path, 
path.endsWith("repeated_dql.q")))
+.collect(Collectors.toList());
+}
+
 @BeforeEach
 @Override
 public void before(@TempDir Path temporaryFolder) throws Exception {
 super.before(temporaryFolder);
-sessionHandle = service.openSession(defaultSessionEnvironment);
+SessionEnvironment sessionEnvironment =
+isPlanCacheEnabled()
+? SESSION_ENVIRONMENT_WITH_PLAN_CACHE_ENABLED
+: DEFAULT_SESSION_ENVIRONMENT;
+sessionHandle = service.openSession(sessionEnvironment);
+}
+
+@AfterEach
+public void after() {
+if (isPlanCacheEnabled()) {
+CacheStats cacheStats =
+((SqlGatewayServiceImpl) service)
+.getSession(sessionHandle)
+.getPlanCacheManager()
+.getCacheStats();
+assertThat(cacheStats).isEqualTo(new CacheStats(4, 14, 0, 0, 0, 
0));

Review Comment:
   This assertion affects `repeated_dql.q` as we only enable plan cache for it. 
So we only have to modify the expected value when we change the queries in 
`repeated_dql.q`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33266][sql-gateway] Support plan cache for DQL in SQL Gateway [flink]

2023-12-06 Thread via GitHub


zoudan commented on code in PR #23849:
URL: https://github.com/apache/flink/pull/23849#discussion_r1416948569


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/PlanCacheManager.java:
##
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava31.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder;
+import org.apache.flink.shaded.guava31.com.google.common.cache.CacheStats;
+
+import java.time.Duration;
+import java.util.Optional;
+
+/** This manages all the plan caches. */
+@Internal
+public class PlanCacheManager {
+
+private final Cache planCache;
+
+public PlanCacheManager(long maximumCapacity, Duration ttl) {
+planCache =
+CacheBuilder.newBuilder()
+.maximumSize(maximumCapacity)
+.expireAfterWrite(ttl)
+.recordStats()
+.build();
+}
+
+public Optional getPlan(String query) {
+CachedPlan cachedPlan = planCache.getIfPresent(query);
+return Optional.ofNullable(cachedPlan);
+}
+
+public void putPlan(String query, CachedPlan cachedPlan) {
+Preconditions.checkNotNull(query, "query can not be null");
+Preconditions.checkNotNull(cachedPlan, "cachedPlan can not be null");
+planCache.put(query, cachedPlan);
+}
+
+public void invalidateAll() {
+planCache.invalidateAll();
+}
+
+public CacheStats getCacheStats() {

Review Comment:
   It only used in tests for now, but may be used in other case, e.g. monitor 
the cache stats. I am not sure wether we should add `@ VisibleForTesting` for 
now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33266][sql-gateway] Support plan cache for DQL in SQL Gateway [flink]

2023-12-06 Thread via GitHub


libenchao commented on code in PR #23849:
URL: https://github.com/apache/flink/pull/23849#discussion_r1416954073


##
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceStatementITCase.java:
##
@@ -52,18 +59,49 @@
 /** Test {@link SqlGatewayService}#executeStatement. */
 public class SqlGatewayServiceStatementITCase extends 
AbstractSqlGatewayStatementITCase {
 
-private final SessionEnvironment defaultSessionEnvironment =
+private static final SessionEnvironment DEFAULT_SESSION_ENVIRONMENT =
+SessionEnvironment.newBuilder()
+.setSessionEndpointVersion(MockedEndpointVersion.V1)
+.build();
+
+private static final SessionEnvironment 
SESSION_ENVIRONMENT_WITH_PLAN_CACHE_ENABLED =
 SessionEnvironment.newBuilder()
 .setSessionEndpointVersion(MockedEndpointVersion.V1)
+.addSessionConfig(
+Collections.singletonMap(
+
SQL_GATEWAY_SESSION_PLAN_CACHE_ENABLED.key(), "true"))
 .build();
 
 private SessionHandle sessionHandle;
 
+@Parameters(name = "parameters={0}")
+public static List parameters() throws Exception {
+return listFlinkSqlTests().stream()
+.map(path -> new StatementTestParameters(path, 
path.endsWith("repeated_dql.q")))
+.collect(Collectors.toList());
+}
+
 @BeforeEach
 @Override
 public void before(@TempDir Path temporaryFolder) throws Exception {
 super.before(temporaryFolder);
-sessionHandle = service.openSession(defaultSessionEnvironment);
+SessionEnvironment sessionEnvironment =
+isPlanCacheEnabled()
+? SESSION_ENVIRONMENT_WITH_PLAN_CACHE_ENABLED
+: DEFAULT_SESSION_ENVIRONMENT;
+sessionHandle = service.openSession(sessionEnvironment);
+}
+
+@AfterEach
+public void after() {
+if (isPlanCacheEnabled()) {
+CacheStats cacheStats =
+((SqlGatewayServiceImpl) service)
+.getSession(sessionHandle)
+.getPlanCacheManager()
+.getCacheStats();
+assertThat(cacheStats).isEqualTo(new CacheStats(4, 14, 0, 0, 0, 
0));

Review Comment:
   That sounds good, then let's keep it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33753) ContinuousFileReaderOperator consume records as mini batch

2023-12-06 Thread Prabhu Joseph (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Prabhu Joseph updated FLINK-33753:
--
Affects Version/s: 1.16.0
   (was: 1.18.0)

> ContinuousFileReaderOperator consume records as mini batch
> --
>
> Key: FLINK-33753
> URL: https://issues.apache.org/jira/browse/FLINK-33753
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.16.0
>Reporter: Prabhu Joseph
>Priority: Major
>
> The ContinuousFileReaderOperator reads and collects the records from a split 
> in a loop. If the split size is large, then the loop will take more time, and 
> then the mailbox executor won't have a chance to process the checkpoint 
> barrier. This leads to checkpoint timing out. ContinuousFileReaderOperator 
> could be improved to consume the records in a mini batch, similar to Hudi's 
> StreamReadOperator (https://issues.apache.org/jira/browse/HUDI-2485).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33502][network] Prevent DiskTierProducerAgent#getSegmentIdByIndexOfFirstBufferInSegment from throwing NPE when the task is released [flink]

2023-12-06 Thread via GitHub


reswqa merged PR #23863:
URL: https://github.com/apache/flink/pull/23863


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33266][sql-gateway] Support plan cache for DQL in SQL Gateway [flink]

2023-12-06 Thread via GitHub


libenchao commented on code in PR #23849:
URL: https://github.com/apache/flink/pull/23849#discussion_r1416956966


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/PlanCacheManager.java:
##
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava31.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder;
+import org.apache.flink.shaded.guava31.com.google.common.cache.CacheStats;
+
+import java.time.Duration;
+import java.util.Optional;
+
+/** This manages all the plan caches. */
+@Internal
+public class PlanCacheManager {
+
+private final Cache planCache;
+
+public PlanCacheManager(long maximumCapacity, Duration ttl) {
+planCache =
+CacheBuilder.newBuilder()
+.maximumSize(maximumCapacity)
+.expireAfterWrite(ttl)
+.recordStats()
+.build();
+}
+
+public Optional getPlan(String query) {
+CachedPlan cachedPlan = planCache.getIfPresent(query);
+return Optional.ofNullable(cachedPlan);
+}
+
+public void putPlan(String query, CachedPlan cachedPlan) {
+Preconditions.checkNotNull(query, "query can not be null");
+Preconditions.checkNotNull(cachedPlan, "cachedPlan can not be null");
+planCache.put(query, cachedPlan);
+}
+
+public void invalidateAll() {
+planCache.invalidateAll();
+}
+
+public CacheStats getCacheStats() {

Review Comment:
   Then we can remove it when it's really used other than tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-33502) HybridShuffleITCase caused a fatal error

2023-12-06 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo resolved FLINK-33502.

Resolution: Fixed

> HybridShuffleITCase caused a fatal error
> 
>
> Key: FLINK-33502
> URL: https://issues.apache.org/jira/browse/FLINK-33502
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.19.0
>Reporter: Matthias Pohl
>Assignee: Wencong Liu
>Priority: Major
>  Labels: pull-request-available, test-stability
> Fix For: 1.19.0
>
> Attachments: image-2023-11-20-14-37-37-321.png
>
>
> [https://github.com/XComp/flink/actions/runs/6789774296/job/18458197040#step:12:9177]
> {code:java}
> Error: 21:21:35 21:21:35.379 [ERROR] Error occurred in starting fork, check 
> output in log
> 9168Error: 21:21:35 21:21:35.379 [ERROR] Process Exit Code: 239
> 9169Error: 21:21:35 21:21:35.379 [ERROR] Crashed tests:
> 9170Error: 21:21:35 21:21:35.379 [ERROR] 
> org.apache.flink.test.runtime.HybridShuffleITCase
> 9171Error: 21:21:35 21:21:35.379 [ERROR] 
> org.apache.maven.surefire.booter.SurefireBooterForkException: 
> ExecutionException The forked VM terminated without properly saying goodbye. 
> VM crash or System.exit called?
> 9172Error: 21:21:35 21:21:35.379 [ERROR] Command was /bin/sh -c cd 
> /root/flink/flink-tests && /usr/lib/jvm/jdk-11.0.19+7/bin/java -XX:+UseG1GC 
> -Xms256m -XX:+IgnoreUnrecognizedVMOptions 
> --add-opens=java.base/java.util=ALL-UNNAMED 
> --add-opens=java.base/java.io=ALL-UNNAMED -Xmx1536m -jar 
> /root/flink/flink-tests/target/surefire/surefirebooter10811559899200556131.jar
>  /root/flink/flink-tests/target/surefire 2023-11-07T20-32-50_466-jvmRun4 
> surefire6242806641230738408tmp surefire_1603959900047297795160tmp
> 9173Error: 21:21:35 21:21:35.379 [ERROR] Error occurred in starting fork, 
> check output in log
> 9174Error: 21:21:35 21:21:35.379 [ERROR] Process Exit Code: 239
> 9175Error: 21:21:35 21:21:35.379 [ERROR] Crashed tests:
> 9176Error: 21:21:35 21:21:35.379 [ERROR] 
> org.apache.flink.test.runtime.HybridShuffleITCase
> 9177Error: 21:21:35 21:21:35.379 [ERROR]  at 
> org.apache.maven.plugin.surefire.booterclient.ForkStarter.awaitResultsDone(ForkStarter.java:532)
> 9178Error: 21:21:35 21:21:35.379 [ERROR]  at 
> org.apache.maven.plugin.surefire.booterclient.ForkStarter.runSuitesForkPerTestSet(ForkStarter.java:479)
> 9179Error: 21:21:35 21:21:35.379 [ERROR]  at 
> org.apache.maven.plugin.surefire.booterclient.ForkStarter.run(ForkStarter.java:322)
> 9180Error: 21:21:35 21:21:35.379 [ERROR]  at 
> org.apache.maven.plugin.surefire.booterclient.ForkStarter.run(ForkStarter.java:266)
> [...] {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33502) HybridShuffleITCase caused a fatal error

2023-12-06 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17793578#comment-17793578
 ] 

Weijie Guo commented on FLINK-33502:


master(1.19) via 6db32a3374a3fdb34c47039c6240c3845a3e1e30.

> HybridShuffleITCase caused a fatal error
> 
>
> Key: FLINK-33502
> URL: https://issues.apache.org/jira/browse/FLINK-33502
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.19.0
>Reporter: Matthias Pohl
>Assignee: Wencong Liu
>Priority: Major
>  Labels: pull-request-available, test-stability
> Fix For: 1.19.0
>
> Attachments: image-2023-11-20-14-37-37-321.png
>
>
> [https://github.com/XComp/flink/actions/runs/6789774296/job/18458197040#step:12:9177]
> {code:java}
> Error: 21:21:35 21:21:35.379 [ERROR] Error occurred in starting fork, check 
> output in log
> 9168Error: 21:21:35 21:21:35.379 [ERROR] Process Exit Code: 239
> 9169Error: 21:21:35 21:21:35.379 [ERROR] Crashed tests:
> 9170Error: 21:21:35 21:21:35.379 [ERROR] 
> org.apache.flink.test.runtime.HybridShuffleITCase
> 9171Error: 21:21:35 21:21:35.379 [ERROR] 
> org.apache.maven.surefire.booter.SurefireBooterForkException: 
> ExecutionException The forked VM terminated without properly saying goodbye. 
> VM crash or System.exit called?
> 9172Error: 21:21:35 21:21:35.379 [ERROR] Command was /bin/sh -c cd 
> /root/flink/flink-tests && /usr/lib/jvm/jdk-11.0.19+7/bin/java -XX:+UseG1GC 
> -Xms256m -XX:+IgnoreUnrecognizedVMOptions 
> --add-opens=java.base/java.util=ALL-UNNAMED 
> --add-opens=java.base/java.io=ALL-UNNAMED -Xmx1536m -jar 
> /root/flink/flink-tests/target/surefire/surefirebooter10811559899200556131.jar
>  /root/flink/flink-tests/target/surefire 2023-11-07T20-32-50_466-jvmRun4 
> surefire6242806641230738408tmp surefire_1603959900047297795160tmp
> 9173Error: 21:21:35 21:21:35.379 [ERROR] Error occurred in starting fork, 
> check output in log
> 9174Error: 21:21:35 21:21:35.379 [ERROR] Process Exit Code: 239
> 9175Error: 21:21:35 21:21:35.379 [ERROR] Crashed tests:
> 9176Error: 21:21:35 21:21:35.379 [ERROR] 
> org.apache.flink.test.runtime.HybridShuffleITCase
> 9177Error: 21:21:35 21:21:35.379 [ERROR]  at 
> org.apache.maven.plugin.surefire.booterclient.ForkStarter.awaitResultsDone(ForkStarter.java:532)
> 9178Error: 21:21:35 21:21:35.379 [ERROR]  at 
> org.apache.maven.plugin.surefire.booterclient.ForkStarter.runSuitesForkPerTestSet(ForkStarter.java:479)
> 9179Error: 21:21:35 21:21:35.379 [ERROR]  at 
> org.apache.maven.plugin.surefire.booterclient.ForkStarter.run(ForkStarter.java:322)
> 9180Error: 21:21:35 21:21:35.379 [ERROR]  at 
> org.apache.maven.plugin.surefire.booterclient.ForkStarter.run(ForkStarter.java:266)
> [...] {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33266][sql-gateway] Support plan cache for DQL in SQL Gateway [flink]

2023-12-06 Thread via GitHub


zoudan commented on code in PR #23849:
URL: https://github.com/apache/flink/pull/23849#discussion_r1416957349


##
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceStatementITCase.java:
##
@@ -52,18 +59,49 @@
 /** Test {@link SqlGatewayService}#executeStatement. */
 public class SqlGatewayServiceStatementITCase extends 
AbstractSqlGatewayStatementITCase {
 
-private final SessionEnvironment defaultSessionEnvironment =
+private static final SessionEnvironment DEFAULT_SESSION_ENVIRONMENT =
+SessionEnvironment.newBuilder()
+.setSessionEndpointVersion(MockedEndpointVersion.V1)
+.build();
+
+private static final SessionEnvironment 
SESSION_ENVIRONMENT_WITH_PLAN_CACHE_ENABLED =
 SessionEnvironment.newBuilder()
 .setSessionEndpointVersion(MockedEndpointVersion.V1)
+.addSessionConfig(
+Collections.singletonMap(
+
SQL_GATEWAY_SESSION_PLAN_CACHE_ENABLED.key(), "true"))
 .build();
 
 private SessionHandle sessionHandle;
 
+@Parameters(name = "parameters={0}")
+public static List parameters() throws Exception {
+return listFlinkSqlTests().stream()
+.map(path -> new StatementTestParameters(path, 
path.endsWith("repeated_dql.q")))
+.collect(Collectors.toList());
+}
+
 @BeforeEach
 @Override
 public void before(@TempDir Path temporaryFolder) throws Exception {
 super.before(temporaryFolder);
-sessionHandle = service.openSession(defaultSessionEnvironment);
+SessionEnvironment sessionEnvironment =
+isPlanCacheEnabled()
+? SESSION_ENVIRONMENT_WITH_PLAN_CACHE_ENABLED
+: DEFAULT_SESSION_ENVIRONMENT;
+sessionHandle = service.openSession(sessionEnvironment);
+}
+
+@AfterEach
+public void after() {
+if (isPlanCacheEnabled()) {
+CacheStats cacheStats =
+((SqlGatewayServiceImpl) service)
+.getSession(sessionHandle)
+.getPlanCacheManager()
+.getCacheStats();
+assertThat(cacheStats).isEqualTo(new CacheStats(4, 14, 0, 0, 0, 
0));

Review Comment:
   How about we just check  the value of hitCount? Then we only have to modify 
the expected value when we add/delete/modify a query which has an impact on 
cache hits.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-33502) HybridShuffleITCase caused a fatal error

2023-12-06 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo closed FLINK-33502.
--

> HybridShuffleITCase caused a fatal error
> 
>
> Key: FLINK-33502
> URL: https://issues.apache.org/jira/browse/FLINK-33502
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.19.0
>Reporter: Matthias Pohl
>Assignee: Wencong Liu
>Priority: Major
>  Labels: pull-request-available, test-stability
> Fix For: 1.19.0
>
> Attachments: image-2023-11-20-14-37-37-321.png
>
>
> [https://github.com/XComp/flink/actions/runs/6789774296/job/18458197040#step:12:9177]
> {code:java}
> Error: 21:21:35 21:21:35.379 [ERROR] Error occurred in starting fork, check 
> output in log
> 9168Error: 21:21:35 21:21:35.379 [ERROR] Process Exit Code: 239
> 9169Error: 21:21:35 21:21:35.379 [ERROR] Crashed tests:
> 9170Error: 21:21:35 21:21:35.379 [ERROR] 
> org.apache.flink.test.runtime.HybridShuffleITCase
> 9171Error: 21:21:35 21:21:35.379 [ERROR] 
> org.apache.maven.surefire.booter.SurefireBooterForkException: 
> ExecutionException The forked VM terminated without properly saying goodbye. 
> VM crash or System.exit called?
> 9172Error: 21:21:35 21:21:35.379 [ERROR] Command was /bin/sh -c cd 
> /root/flink/flink-tests && /usr/lib/jvm/jdk-11.0.19+7/bin/java -XX:+UseG1GC 
> -Xms256m -XX:+IgnoreUnrecognizedVMOptions 
> --add-opens=java.base/java.util=ALL-UNNAMED 
> --add-opens=java.base/java.io=ALL-UNNAMED -Xmx1536m -jar 
> /root/flink/flink-tests/target/surefire/surefirebooter10811559899200556131.jar
>  /root/flink/flink-tests/target/surefire 2023-11-07T20-32-50_466-jvmRun4 
> surefire6242806641230738408tmp surefire_1603959900047297795160tmp
> 9173Error: 21:21:35 21:21:35.379 [ERROR] Error occurred in starting fork, 
> check output in log
> 9174Error: 21:21:35 21:21:35.379 [ERROR] Process Exit Code: 239
> 9175Error: 21:21:35 21:21:35.379 [ERROR] Crashed tests:
> 9176Error: 21:21:35 21:21:35.379 [ERROR] 
> org.apache.flink.test.runtime.HybridShuffleITCase
> 9177Error: 21:21:35 21:21:35.379 [ERROR]  at 
> org.apache.maven.plugin.surefire.booterclient.ForkStarter.awaitResultsDone(ForkStarter.java:532)
> 9178Error: 21:21:35 21:21:35.379 [ERROR]  at 
> org.apache.maven.plugin.surefire.booterclient.ForkStarter.runSuitesForkPerTestSet(ForkStarter.java:479)
> 9179Error: 21:21:35 21:21:35.379 [ERROR]  at 
> org.apache.maven.plugin.surefire.booterclient.ForkStarter.run(ForkStarter.java:322)
> 9180Error: 21:21:35 21:21:35.379 [ERROR]  at 
> org.apache.maven.plugin.surefire.booterclient.ForkStarter.run(ForkStarter.java:266)
> [...] {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33266][sql-gateway] Support plan cache for DQL in SQL Gateway [flink]

2023-12-06 Thread via GitHub


libenchao commented on code in PR #23849:
URL: https://github.com/apache/flink/pull/23849#discussion_r1416960738


##
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceStatementITCase.java:
##
@@ -52,18 +59,49 @@
 /** Test {@link SqlGatewayService}#executeStatement. */
 public class SqlGatewayServiceStatementITCase extends 
AbstractSqlGatewayStatementITCase {
 
-private final SessionEnvironment defaultSessionEnvironment =
+private static final SessionEnvironment DEFAULT_SESSION_ENVIRONMENT =
+SessionEnvironment.newBuilder()
+.setSessionEndpointVersion(MockedEndpointVersion.V1)
+.build();
+
+private static final SessionEnvironment 
SESSION_ENVIRONMENT_WITH_PLAN_CACHE_ENABLED =
 SessionEnvironment.newBuilder()
 .setSessionEndpointVersion(MockedEndpointVersion.V1)
+.addSessionConfig(
+Collections.singletonMap(
+
SQL_GATEWAY_SESSION_PLAN_CACHE_ENABLED.key(), "true"))
 .build();
 
 private SessionHandle sessionHandle;
 
+@Parameters(name = "parameters={0}")
+public static List parameters() throws Exception {
+return listFlinkSqlTests().stream()
+.map(path -> new StatementTestParameters(path, 
path.endsWith("repeated_dql.q")))
+.collect(Collectors.toList());
+}
+
 @BeforeEach
 @Override
 public void before(@TempDir Path temporaryFolder) throws Exception {
 super.before(temporaryFolder);
-sessionHandle = service.openSession(defaultSessionEnvironment);
+SessionEnvironment sessionEnvironment =
+isPlanCacheEnabled()
+? SESSION_ENVIRONMENT_WITH_PLAN_CACHE_ENABLED
+: DEFAULT_SESSION_ENVIRONMENT;
+sessionHandle = service.openSession(sessionEnvironment);
+}
+
+@AfterEach
+public void after() {
+if (isPlanCacheEnabled()) {
+CacheStats cacheStats =
+((SqlGatewayServiceImpl) service)
+.getSession(sessionHandle)
+.getPlanCacheManager()
+.getCacheStats();
+assertThat(cacheStats).isEqualTo(new CacheStats(4, 14, 0, 0, 0, 
0));

Review Comment:
   I don't have a strong opinion on it, `repeated_dql.iq` is a dedicated test 
for verifying cache, so it's ok to bind it with the test expectation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33268) Flink REST API response parsing should support backward compatible changes like new fields

2023-12-06 Thread Gabor Somogyi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33268?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Somogyi updated FLINK-33268:
--
Description: 
At the moment Flink is not ignoring unknown fields when parsing REST responses. 
An example for such a class is JobDetailsInfo but this applies to all others. 
It would be good to add this support to increase compatibility.

The real life use-case is when the operator wants to handle 2 jobs with 2 
different Flink versions where the newer version has added a new field to any 
REST response. Such case the operator gets an exception when tries to poll the 
job details with the additional field.

  was:At the moment Flink is not ignoring unknown fields when parsing REST 
responses. An example for such a class is JobDetailsInfo but this applies to 
all others. It would be good to add this support to increase compatibility.


> Flink REST API response parsing should support backward compatible changes 
> like new fields
> --
>
> Key: FLINK-33268
> URL: https://issues.apache.org/jira/browse/FLINK-33268
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Affects Versions: 1.19.0
>Reporter: Gabor Somogyi
>Priority: Major
>
> At the moment Flink is not ignoring unknown fields when parsing REST 
> responses. An example for such a class is JobDetailsInfo but this applies to 
> all others. It would be good to add this support to increase compatibility.
> The real life use-case is when the operator wants to handle 2 jobs with 2 
> different Flink versions where the newer version has added a new field to any 
> REST response. Such case the operator gets an exception when tries to poll 
> the job details with the additional field.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33268) Flink REST API response parsing should support backward compatible changes like new fields

2023-12-06 Thread Gabor Somogyi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33268?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Somogyi updated FLINK-33268:
--
Description: 
At the moment Flink is not ignoring unknown fields when parsing REST responses. 
An example for such a class is JobDetailsInfo but this applies to all others. 
It would be good to add this support to increase compatibility.

The real life use-case is when the Flink k8s operator wants to handle 2 jobs 
with 2 different Flink versions where the newer version has added a new field 
to any REST response. Such case the operator gets an exception when tries to 
poll the job details with the additional field.

  was:
At the moment Flink is not ignoring unknown fields when parsing REST responses. 
An example for such a class is JobDetailsInfo but this applies to all others. 
It would be good to add this support to increase compatibility.

The real life use-case is when the operator wants to handle 2 jobs with 2 
different Flink versions where the newer version has added a new field to any 
REST response. Such case the operator gets an exception when tries to poll the 
job details with the additional field.


> Flink REST API response parsing should support backward compatible changes 
> like new fields
> --
>
> Key: FLINK-33268
> URL: https://issues.apache.org/jira/browse/FLINK-33268
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Affects Versions: 1.19.0
>Reporter: Gabor Somogyi
>Priority: Major
>
> At the moment Flink is not ignoring unknown fields when parsing REST 
> responses. An example for such a class is JobDetailsInfo but this applies to 
> all others. It would be good to add this support to increase compatibility.
> The real life use-case is when the Flink k8s operator wants to handle 2 jobs 
> with 2 different Flink versions where the newer version has added a new field 
> to any REST response. Such case the operator gets an exception when tries to 
> poll the job details with the additional field.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33666) MergeTableLikeUtil uses different constraint name than Schema

2023-12-06 Thread Jeyhun Karimov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17793590#comment-17793590
 ] 

Jeyhun Karimov commented on FLINK-33666:


Hi [~twalthr] can you please assign this task to me or give me access to 
self-assign tasks?

> MergeTableLikeUtil uses different constraint name than Schema
> -
>
> Key: FLINK-33666
> URL: https://issues.apache.org/jira/browse/FLINK-33666
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> {{MergeTableLikeUtil}} uses a different algorithm to name constraints than 
> {{Schema}}. 
> {{Schema}} includes the column names.
> {{MergeTableLikeUtil}} uses a hashCode which means it might depend on JVM 
> specifics.
> For consistency we should use the same algorithm. I propose to use 
> {{Schema}}'s logic.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33268) Flink REST API response parsing should support backward compatible changes like new fields

2023-12-06 Thread Gabor Somogyi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33268?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Somogyi updated FLINK-33268:
--
Description: 
At the moment Flink is not ignoring unknown fields when parsing REST responses. 
An example for such a class is JobDetailsInfo but this applies to all others. 
It would be good to add this support to increase compatibility.

The real life use-case is when the Flink k8s operator wants to handle 2 jobs 
with 2 different Flink versions where the newer version has added a new field 
to any REST response. Such case the operator gets an exception when for example 
it tries to poll the job details with the additional field.

  was:
At the moment Flink is not ignoring unknown fields when parsing REST responses. 
An example for such a class is JobDetailsInfo but this applies to all others. 
It would be good to add this support to increase compatibility.

The real life use-case is when the Flink k8s operator wants to handle 2 jobs 
with 2 different Flink versions where the newer version has added a new field 
to any REST response. Such case the operator gets an exception when tries to 
poll the job details with the additional field.


> Flink REST API response parsing should support backward compatible changes 
> like new fields
> --
>
> Key: FLINK-33268
> URL: https://issues.apache.org/jira/browse/FLINK-33268
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Affects Versions: 1.19.0
>Reporter: Gabor Somogyi
>Priority: Major
>
> At the moment Flink is not ignoring unknown fields when parsing REST 
> responses. An example for such a class is JobDetailsInfo but this applies to 
> all others. It would be good to add this support to increase compatibility.
> The real life use-case is when the Flink k8s operator wants to handle 2 jobs 
> with 2 different Flink versions where the newer version has added a new field 
> to any REST response. Such case the operator gets an exception when for 
> example it tries to poll the job details with the additional field.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33433][rest] Introduce async-profiler to support profiling Job… [flink]

2023-12-06 Thread via GitHub


yuchen-ecnu commented on code in PR #23820:
URL: https://github.com/apache/flink/pull/23820#discussion_r1416993727


##
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/ProfilingService.java:
##
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.rest.messages.ProfilingInfo;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import one.profiler.AsyncProfiler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.text.SimpleDateFormat;
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/** Create and keep profiling requests with rolling policy. */
+public class ProfilingService implements Closeable {

Review Comment:
   I have added UTs for `ProfilingService` to ensure that it works as expected.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33268) Flink REST API response parsing throws exception on new fields

2023-12-06 Thread Gabor Somogyi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33268?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Somogyi updated FLINK-33268:
--
Summary: Flink REST API response parsing throws exception on new fields  
(was: Flink REST API response parsing should support backward compatible 
changes like new fields)

> Flink REST API response parsing throws exception on new fields
> --
>
> Key: FLINK-33268
> URL: https://issues.apache.org/jira/browse/FLINK-33268
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Affects Versions: 1.19.0
>Reporter: Gabor Somogyi
>Priority: Major
>
> At the moment Flink is not ignoring unknown fields when parsing REST 
> responses. An example for such a class is JobDetailsInfo but this applies to 
> all others. It would be good to add this support to increase compatibility.
> The real life use-case is when the Flink k8s operator wants to handle 2 jobs 
> with 2 different Flink versions where the newer version has added a new field 
> to any REST response. Such case the operator gets an exception when for 
> example it tries to poll the job details with the additional field.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-33599] Run restore tests with RocksDB state backend [flink]

2023-12-06 Thread via GitHub


dawidwys opened a new pull request, #23883:
URL: https://github.com/apache/flink/pull/23883

   ## What is the purpose of the change
   
   The PR adjusts the `RestoreTestBase` to run with `RocksDB` statebackend 
which is used more often in production workloads.
   
   ## Verifying this change
   
   All existing tests pass.
   You can run the test and verify only `EmbeddedRocksDBStateBackend` is called 
from `RestoreTestBase`.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't 
know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33599) Run restore tests with RocksDB state backend

2023-12-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33599:
---
Labels: pull-request-available  (was: )

> Run restore tests with RocksDB state backend
> 
>
> Key: FLINK-33599
> URL: https://issues.apache.org/jira/browse/FLINK-33599
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33691] Support agg push down for 'count(*)/count(1)/count(column not null)' [flink]

2023-12-06 Thread via GitHub


fsk119 merged PR #23828:
URL: https://github.com/apache/flink/pull/23828


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-33691) Support agg push down for 'count(*)/count(1)/count(column not null)'

2023-12-06 Thread Shengkai Fang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shengkai Fang closed FLINK-33691.
-
Resolution: Implemented

> Support agg push down for 'count(*)/count(1)/count(column not null)'
> 
>
> Key: FLINK-33691
> URL: https://issues.apache.org/jira/browse/FLINK-33691
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.19.0
>Reporter: Yunhong Zheng
>Assignee: Yunhong Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> Now,  PushLocalAggIntoScanRule cannot push down 'count( * 
> )/count(1)/count(column not null)', but it can push down count(column 
> nullable). The reason is that count( * ) and count( 1 ) will be optimized to 
> a scan with calc as '0 AS $f0' to reduce read cost, which will not match the 
> push down rule



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33691) Support agg push down for 'count(*)/count(1)/count(column not null)'

2023-12-06 Thread Shengkai Fang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17793592#comment-17793592
 ] 

Shengkai Fang commented on FLINK-33691:
---

Merged into master: 3060ccd49cc8d19634b431dbf0f09ac875d0d422

> Support agg push down for 'count(*)/count(1)/count(column not null)'
> 
>
> Key: FLINK-33691
> URL: https://issues.apache.org/jira/browse/FLINK-33691
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.19.0
>Reporter: Yunhong Zheng
>Assignee: Yunhong Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> Now,  PushLocalAggIntoScanRule cannot push down 'count( * 
> )/count(1)/count(column not null)', but it can push down count(column 
> nullable). The reason is that count( * ) and count( 1 ) will be optimized to 
> a scan with calc as '0 AS $f0' to reduce read cost, which will not match the 
> push down rule



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33599] Run restore tests with RocksDB state backend [flink]

2023-12-06 Thread via GitHub


flinkbot commented on PR #23883:
URL: https://github.com/apache/flink/pull/23883#issuecomment-1842526638

   
   ## CI report:
   
   * 53fa1b8bd754312a795cc2c2c80ad3b2f567be26 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33480] Implement restore tests for GroupAggregate node [flink]

2023-12-06 Thread via GitHub


dawidwys commented on code in PR #23681:
URL: https://github.com/apache/flink/pull/23681#discussion_r1417001274


##
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/SinkTestStep.java:
##
@@ -133,6 +134,12 @@ public Builder consumedAfterRestore(Row... expectedRows) {
 return this;
 }
 
+public Builder ignoreConsumedAfterRestore() {

Review Comment:
   Why do we need that? If you want to ignore after restore data, why do you 
add it in the first place?



##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateTestPrograms.java:
##
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import 
org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.ConcatDistinctAggFunction;
+import 
org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.VarSum1AggFunction;
+import 
org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.VarSum2AggFunction;
+import 
org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.WeightedAvg;
+import 
org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.WeightedAvgWithMerge;
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+
+import java.time.Duration;
+
+/** {@link TableTestProgram} definitions for testing {@link 
StreamExecGroupAggregate}. */
+public class GroupAggregateTestPrograms {
+
+static final SourceTestStep SOURCE_ONE =
+SourceTestStep.newBuilder("source_t")
+.addSchema("a INT", "b BIGINT", "c VARCHAR")
+.producedBeforeRestore(
+Row.of(1, 1L, "Hi"),
+Row.of(2, 2L, "Hello"),
+Row.of(2, 2L, "Hello World"))
+.producedAfterRestore(
+Row.of(1, 1L, "Hi Again!"),
+Row.of(2, 2L, "Hello Again!"),
+Row.of(2, 2L, "Hello World Again!"))
+.build();
+
+static final SourceTestStep SOURCE_TWO =
+SourceTestStep.newBuilder("source_t")
+.addSchema("a INT", "b BIGINT", "c INT", "d VARCHAR", "e 
BIGINT")
+.producedBeforeRestore(
+Row.of(2, 3L, 2, "Hello World Like", 1L),
+Row.of(3, 4L, 3, "Hello World Its nice", 2L),
+Row.of(2, 2L, 1, "Hello World", 2L),
+Row.of(1, 1L, 0, "Hello", 1L),
+Row.of(5, 11L, 10, "GHI", 1L),
+Row.of(3, 5L, 4, "ABC", 2L),
+Row.of(4, 10L, 9, "FGH", 2L),
+Row.of(4, 7L, 6, "CDE", 2L),
+Row.of(5, 14L, 13, "JKL", 2L),
+Row.of(4, 9L, 8, "EFG", 1L),
+Row.of(5, 15L, 14, "KLM", 2L),
+Row.of(5, 12L, 11, "HIJ", 3L),
+Row.of(4, 8L, 7, "DEF", 1L),
+Row.of(5, 13L, 12, "IJK", 3L),
+Row.of(3, 6L, 5, "BCD", 3L))
+.producedAfterRestore(
+Row.of(1, 1L, 0, "Hello", 1L),
+Row.of(3, 5L, 4, "ABC", 2L),
+Row.of(4, 10L, 9, "FGH", 2L),
+Row.of(4, 7L, 6, "CDE", 2L),
+Row.of(3, 6L, 5, "BCD", 3L))
+.build();
+
+static final TableTestProgram GROUP_BY_SIMPLE =
+TableTestProgram.of(
+"group-aggregate-simple", "validates basic 
aggregation using group by")
+.setupTableSource(SOURCE_ONE)
+.setupTableSink(
+  

Re: [PR] [FLINK-33757] Implement restore tests for Rank node [flink]

2023-12-06 Thread via GitHub


dawidwys commented on code in PR #23878:
URL: https://github.com/apache/flink/pull/23878#discussion_r1417047962


##
flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-rank_1/rank-test/plan/rank-test.json:
##
@@ -81,19 +52,20 @@
 },
 "orderBy" : {
   "fields" : [ {
-"index" : 0,
+"index" : 2,
 "isAscending" : true,
 "nullIsLast" : false
   } ]
 },
 "rankRange" : {
-  "type" : "Variable",
-  "endIndex" : 0
+  "type" : "Constant",
+  "start" : 1,
+  "end" : 1
 },
 "rankStrategy" : {
   "type" : "AppendFast"
 },
-"outputRowNumber" : true,
+"outputRowNumber" : false,

Review Comment:
   do we test other combinations of those parameters?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33758] Implement restore tests for TemporalSort node [flink]

2023-12-06 Thread via GitHub


dawidwys commented on code in PR #23879:
URL: https://github.com/apache/flink/pull/23879#discussion_r1417052376


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortTestPrograms.java:
##
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+
+import java.math.BigDecimal;
+
+/** {@link TableTestProgram} definitions for testing {@link 
StreamExecTemporalSort}. */
+public class TemporalSortTestPrograms {
+
+static final Row[] BEFORE_DATA = {
+Row.of("2020-10-10 00:00:01", 1, 1d, 1f, new BigDecimal("1.11"), "Hi", 
"a"),
+Row.of("2020-10-10 00:00:02", 2, 2d, 2f, new BigDecimal("2.22"), 
"Comment#1", "a"),
+Row.of("2020-10-10 00:00:07", 5, 6d, 3f, null, "Hello", "b"),
+Row.of("2020-10-10 00:00:07", 3, 3d, 3f, null, "Hello", "b"),
+// out of order
+Row.of("2020-10-10 00:00:06", 6, 6d, 6f, new BigDecimal("6.66"), "Hi", 
"b"),
+Row.of("2020-10-10 00:00:08", 3, null, 3f, new BigDecimal("3.33"), 
"Comment#2", "a"),
+// late event
+Row.of("2020-10-10 00:00:04", 5, 5d, null, new BigDecimal("5.55"), 
"Hi", "a"),
+Row.of("2020-10-10 00:00:16", 4, 4d, 4f, new BigDecimal("4.44"), "Hi", 
"b")
+};
+
+static final Row[] AFTER_DATA = {
+Row.of("2020-10-10 00:00:40", 10, 3d, 3f, new BigDecimal("4.44"), 
"Comment#4", "a"),
+Row.of("2020-10-10 00:00:42", 11, 4d, 4f, new BigDecimal("5.44"), 
"Comment#5", "d")
+};
+static final TableTestProgram TEMPORAL_SORT_PROCTIME =
+TableTestProgram.of(
+"temporal-sort-proctime", "validates temporal sort 
node with proctime")
+.setupTableSource(
+SourceTestStep.newBuilder("source_t")
+.addSchema(
+"a INT",
+"b BIGINT",
+"c STRING",
+"`proctime` as PROCTIME()")
+.producedBeforeRestore(
+Row.of(1, 1L, "Hi"),
+Row.of(2, 2L, "Hello"),
+Row.of(3, 2L, "Hello world"))
+.producedAfterRestore(
+Row.of(4, 1L, "Guten Morgen"),
+Row.of(5, 2L, "Guten Tag"))
+.build())
+.setupTableSink(
+SinkTestStep.newBuilder("sink_t")
+.addSchema("a INT")
+.consumedBeforeRestore("+I[1]", "+I[2]", 
"+I[3]")
+.consumedAfterRestore("+I[4]", "+I[5]")
+.build())
+.runSql("INSERT INTO sink_t SELECT a from source_t ORDER 
BY proctime")
+.build();
+
+static final TableTestProgram TEMPORAL_SORT_ROWTIME =
+TableTestProgram.of(
+"temporal-sort-rowtime", "validates temporal sort 
node with rowtime")
+.setupTableSource(
+SourceTestStep.newBuilder("source_t")
+.addSchema(
+"ts STRING",
+"`int` INT",
+"`double` DOUBLE",
+"`float` FLOAT",
+"`bigdec` DECIMAL(10, 2)",
+"`string` STRING",
+"`name` STRING",
+ 

Re: [PR] [FLINK-33379] Bump CI flink version on flink-connector-elasticsearch [flink-connector-elasticsearch]

2023-12-06 Thread via GitHub


pgodowski commented on PR #78:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/78#issuecomment-1842605883

   What is the expected timeline for having the flink-connector-elasticsearch7 
[published](https://central.sonatype.com/artifact/org.apache.flink/flink-connector-elasticsearch7/versions),
 with the Flink 1.18 support?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-27681) Improve the availability of Flink when the RocksDB file is corrupted.

2023-12-06 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17793626#comment-17793626
 ] 

Piotr Nowojski commented on FLINK-27681:


{quote}

IIUC, checksum in SST level could guarantee the correctness of local file.

{quote}

Yes, I think we don't need any extra protection for corruption of the local 
files. From the document you shared RocksDB will throw some error every time we 
try to read a corrupted block

{quote}

And checksum in filesystem level could guarantee the correctness of uploading 
and downloading.

{quote}

Now I'm not so sure about it. Now that I think about it more, checksums on the 
filesystem level or the HDD/SSD level wouldn't protect us from a corruption 
happening after reading the bytes from local file, but before those bytes are 
acknowledged by the DFS/object store. 

A neat way would be to calculate the checksum locally, when writing the SST 
file to the local file system ("Full File Checksum Design" from the document 
[~masteryhx]  shared?), without any significant overhead (bytes that we want to 
verify would be after all already in the RAM). Next if we could cheaply verify 
that the uploaded file to the DFS still has the same checksum as computed 
during creation of that file, we could make sure that no matter what, we always 
have valid files in the DFS, that we can fallback to everytime RocksDB detects 
a data corruption when accessing and SST file locally.

It looks like this might be do-able in one [1] of the two [2] ways. At least 
for the S3. 

[1] 
[https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html#checking-object-integrity-md5]

I don't know if AWS's check against the {{Content-MD5}} field is for free. As 
far as I understand it, it could be implement to be almost for free, but the 
docs do not mention that.

[2] 
https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html#trailing-checksums
 

Here the docs are saying that this is for free, but it looks like this is 
calculating a new checksum during the upload process. So the question would be, 
could we retrieve that checksum and compare it against our locally computed one?

 

[~mayuehappy] , if we decide to go this direction, then the change to fail a 
job after checksum mismatching during the async phase could be implemented 
easily here: 
{{{}org.apache.flink.runtime.checkpoint.CheckpointFailureManager{}}}. I don't 
think we need an extra ticket for that, separate commit in the same PR will 
suffice. 

> Improve the availability of Flink when the RocksDB file is corrupted.
> -
>
> Key: FLINK-27681
> URL: https://issues.apache.org/jira/browse/FLINK-27681
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Ming Li
>Assignee: Yue Ma
>Priority: Critical
>  Labels: pull-request-available
> Attachments: image-2023-08-23-15-06-16-717.png
>
>
> We have encountered several times when the RocksDB checksum does not match or 
> the block verification fails when the job is restored. The reason for this 
> situation is generally that there are some problems with the machine where 
> the task is located, which causes the files uploaded to HDFS to be incorrect, 
> but it has been a long time (a dozen minutes to half an hour) when we found 
> this problem. I'm not sure if anyone else has had a similar problem.
> Since this file is referenced by incremental checkpoints for a long time, 
> when the maximum number of checkpoints reserved is exceeded, we can only use 
> this file until it is no longer referenced. When the job failed, it cannot be 
> recovered.
> Therefore we consider:
> 1. Can RocksDB periodically check whether all files are correct and find the 
> problem in time?
> 2. Can Flink automatically roll back to the previous checkpoint when there is 
> a problem with the checkpoint data, because even with manual intervention, it 
> just tries to recover from the existing checkpoint or discard the entire 
> state.
> 3. Can we increase the maximum number of references to a file based on the 
> maximum number of checkpoints reserved? When the number of references exceeds 
> the maximum number of checkpoints -1, the Task side is required to upload a 
> new file for this reference. Not sure if this way will ensure that the new 
> file we upload will be correct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33762) Versioned release of flink-connector-shared-utils python scripts

2023-12-06 Thread Peter Vary (Jira)
Peter Vary created FLINK-33762:
--

 Summary: Versioned release of flink-connector-shared-utils python 
scripts
 Key: FLINK-33762
 URL: https://issues.apache.org/jira/browse/FLINK-33762
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python, Connectors / Common
Reporter: Peter Vary


We need a versioned release of the scripts stored in 
flink-connector-shared-utils/python directory. This will allow even 
incompatible changes for these scripts. The connector developers could chose 
which version of the scripts they depend on.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33710] Prevent triggering cluster upgrades for permutations of the same overrides [flink-kubernetes-operator]

2023-12-06 Thread via GitHub


mxm commented on code in PR #721:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/721#discussion_r1417134168


##
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizerTest.java:
##
@@ -30,12 +31,23 @@
 public class KubernetesScalingRealizerTest {
 
 @Test
-public void testAutoscalerOverridesVertexIdsAreSorted() {
-
+public void 
testAutoscalerOverridesStringDoesNotChangeUnlessOverridesChange() {
 KubernetesJobAutoScalerContext ctx =
 TestingKubernetesAutoscalerUtils.createContext("test", null);
+FlinkDeployment resource = (FlinkDeployment) ctx.getResource();
+
+// Create resource with existing parallelism overrides
+resource.getSpec()
+.getFlinkConfiguration()
+.put(PipelineOptions.PARALLELISM_OVERRIDES.key(), "a:1,b:2");

Review Comment:
   Thanks for elaborating further on this. Testing is somewhat of an art. There 
is also an argument for striking a balance between a specific test and one 
which tests all possible code paths (maybe even ones which don't yet exist).
   
   Black box testing is one way of testing. White box testing is also a valid 
approach. It depends on the situation and it is also a matter of taste and how 
confident one feels that the test covers the most important scenarios.
   
   I completely understand your thought, and I've changed the test method to 
address this concern.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33710] Prevent triggering cluster upgrades for permutations of the same overrides [flink-kubernetes-operator]

2023-12-06 Thread via GitHub


1996fanrui commented on code in PR #721:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/721#discussion_r1417138932


##
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizerTest.java:
##
@@ -30,12 +31,23 @@
 public class KubernetesScalingRealizerTest {
 
 @Test
-public void testAutoscalerOverridesVertexIdsAreSorted() {
-
+public void 
testAutoscalerOverridesStringDoesNotChangeUnlessOverridesChange() {
 KubernetesJobAutoScalerContext ctx =
 TestingKubernetesAutoscalerUtils.createContext("test", null);
+FlinkDeployment resource = (FlinkDeployment) ctx.getResource();
+
+// Create resource with existing parallelism overrides
+resource.getSpec()
+.getFlinkConfiguration()
+.put(PipelineOptions.PARALLELISM_OVERRIDES.key(), "a:1,b:2");

Review Comment:
   Thank you for the feedback! ❤️



##
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizerTest.java:
##
@@ -30,12 +31,23 @@
 public class KubernetesScalingRealizerTest {
 
 @Test
-public void testAutoscalerOverridesVertexIdsAreSorted() {
-
+public void 
testAutoscalerOverridesStringDoesNotChangeUnlessOverridesChange() {
 KubernetesJobAutoScalerContext ctx =
 TestingKubernetesAutoscalerUtils.createContext("test", null);
+FlinkDeployment resource = (FlinkDeployment) ctx.getResource();
+
+// Create resource with existing parallelism overrides
+resource.getSpec()
+.getFlinkConfiguration()
+.put(PipelineOptions.PARALLELISM_OVERRIDES.key(), "a:1,b:2");

Review Comment:
   Thank you for the feedback! ❤️



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33556] Test infrastructure for externalized python code [flink]

2023-12-06 Thread via GitHub


gaborgsomogyi commented on PR #23843:
URL: https://github.com/apache/flink/pull/23843#issuecomment-1842696709

   If there are no further comments then I'm intended to merge this tomorrow.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33556][CI] Test infrastructure for externalized python code [flink-connector-shared-utils]

2023-12-06 Thread via GitHub


gaborgsomogyi commented on code in PR #27:
URL: 
https://github.com/apache/flink-connector-shared-utils/pull/27#discussion_r1417148925


##
python/install_command.sh:
##
@@ -0,0 +1,39 @@
+#!/usr/bin/env bash
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+if [[ "$@" =~ 'apache-flink-libraries' ]]; then
+pushd apache-flink-libraries
+python setup.py sdist
+pushd dist
+python -m pip install *
+popd
+popd
+fi

Review Comment:
   I think this is flink specific so can be removed not to confuse peoples.
   Sooner or later the scripts will deviate because of different needs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33556][CI] Test infrastructure for externalized python code [flink-connector-shared-utils]

2023-12-06 Thread via GitHub


gaborgsomogyi commented on PR #27:
URL: 
https://github.com/apache/flink-connector-shared-utils/pull/27#issuecomment-1842710099

   After the fix if there are no further comments then I'm intended to merge 
this tomorrow.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33556][CI] Test infrastructure for externalized python code [flink-connector-shared-utils]

2023-12-06 Thread via GitHub


pvary commented on code in PR #27:
URL: 
https://github.com/apache/flink-connector-shared-utils/pull/27#discussion_r1417157457


##
python/install_command.sh:
##
@@ -0,0 +1,39 @@
+#!/usr/bin/env bash
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+if [[ "$@" =~ 'apache-flink-libraries' ]]; then
+pushd apache-flink-libraries
+python setup.py sdist
+pushd dist
+python -m pip install *
+popd
+popd
+fi

Review Comment:
   Removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33379) Bump CI flink version on flink-connector-elasticsearch

2023-12-06 Thread pgodowski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17793669#comment-17793669
 ] 

pgodowski commented on FLINK-33379:
---

Hello,

What is the expected timeline of publishing the elasticsearch connector (I 
pressume it would be 3.1.0), supporting Flink 1.18 please?

> Bump CI flink version on flink-connector-elasticsearch
> --
>
> Key: FLINK-33379
> URL: https://issues.apache.org/jira/browse/FLINK-33379
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / ElasticSearch
>Affects Versions: elasticsearch-3.1.0
>Reporter: Yubin Li
>Assignee: Yubin Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: elasticsearch-3.1.0
>
> Attachments: image-2023-10-27-16-54-04-937.png
>
>
> As Flink 1.18 released, bump the flink version in es connector .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33734) Merge unaligned checkpoint state handle

2023-12-06 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17793706#comment-17793706
 ] 

Piotr Nowojski commented on FLINK-33734:


That's a good question [~Zakelly].

Apart of that:

{quote}

jobmangager converts MergedInputChannelStateHandle to InputChannelStateHandle 
collection before assigning state handle, and the rest of the process does not 
need to be changed. 

{quote}

Doesn't this mean we will have a very similar problem during recovery? That 
sending out the RPCs during recovery will take a long time? Wouldn't it be 
better to keep the state handles merged during recovery until they reach their 
destined subtasks on TMs?

> Merge unaligned checkpoint state handle
> ---
>
> Key: FLINK-33734
> URL: https://issues.apache.org/jira/browse/FLINK-33734
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Reporter: Feifan Wang
>Assignee: Feifan Wang
>Priority: Major
>
> h3. Background
> Unaligned checkpoint will write the inflight-data of all InputChannel and 
> ResultSubpartition of the same subtask to the same file during checkpoint. 
> The InputChannelStateHandle and ResultSubpartitionStateHandle organize the 
> metadata of inflight-data at the channel granularity, which causes the file 
> name to be repeated many times. When a job is under backpressure and task 
> parallelism is high, the metadata of unaligned checkpoints will bloat. This 
> will result in:
>  # The amount of data reported by taskmanager to jobmanager increases, and 
> jobmanager takes longer to process these RPC requests.
>  # The metadata of the entire checkpoint becomes very large, and it takes 
> longer to serialize and write it to dfs.
> Both of the above points ultimately lead to longer checkpoint duration.
> h3. A Production example
> Take our production job with a parallelism of 4800 as an example:
>  # When there is no back pressure, checkpoint end-to-end duration is within 7 
> seconds.
>  # When under pressure: checkpoint end-to-end duration often exceeds 1 
> minute. We found that jobmanager took more than 40 seconds to process rpc 
> requests, and serialized metadata took more than 20 seconds.Some checkpoint 
> statistics:
> |metadata file size|950 MB|
> |channel state count|12,229,854|
> |channel file count|5536|
> Of the 950MB in the metadata file, 68% are redundant file paths.
> We enabled log-based checkpoint on this job and hoped that the checkpoint 
> could be completed within 30 seconds. This problem made it difficult to 
> achieve this goal.
> h3. Propose changes
> I suggest introducing MergedInputChannelStateHandle and 
> MergedResultSubpartitionStateHandle to eliminate redundant file paths.
> The taskmanager merges all InputChannelStateHandles with the same delegated 
> StreamStateHandle in the same subtask into one MergedInputChannelStateHandle 
> before reporting. When recovering from checkpoint, jobmangager converts 
> MergedInputChannelStateHandle to InputChannelStateHandle collection before 
> assigning state handle, and the rest of the process does not need to be 
> changed. 
> Structure of MergedInputChannelStateHandle :
>  
> {code:java}
> {   // MergedInputChannelStateHandle
>     "delegate": {
>         "filePath": 
> "viewfs://hadoop-meituan/flink-yg15/checkpoints/retained/1234567/ab8d0c2f02a47586490b15e7a2c30555/chk-31/ffe54c0a-9b6e-4724-aae7-61b96bf8b1cf",
>         "stateSize": 123456
>     },
>     "size": 2000,
>     "subtaskIndex":0,
>     "channels": [ // One InputChannel per element
>         {
>             "info": {
>                 "gateIdx": 0,
>                 "inputChannelIdx": 0
>             },
>             "offsets": [
>                 100,200,300,400
>             ],
>             "size": 1400
>         },
>         {
>             "info": {
>                 "gateIdx": 0,
>                 "inputChannelIdx": 1
>             },
>             "offsets": [
>                 500,600
>             ],
>             "size": 600
>         }
>     ]
> }
>  {code}
> MergedResultSubpartitionStateHandle is similar.
>  
>  
> WDYT [~roman] , [~pnowojski] , [~fanrui] ?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-33734) Merge unaligned checkpoint state handle

2023-12-06 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17793706#comment-17793706
 ] 

Piotr Nowojski edited comment on FLINK-33734 at 12/6/23 1:16 PM:
-

That's a good question [~Zakelly]. IIUC FLIP-306 focuses on merging files, but 
unless it also wants to change the structure of the state handles, it would 
suffer with the same problem as described here. But please correct me if I'm 
wrong.

Apart of that:
{quote}jobmangager converts MergedInputChannelStateHandle to 
InputChannelStateHandle collection before assigning state handle, and the rest 
of the process does not need to be changed. 
{quote}
Doesn't this mean we will have a very similar problem during recovery? That 
sending out the RPCs during recovery will take a long time? Wouldn't it be 
better to keep the state handles merged during recovery until they reach their 
destined subtasks on TMs?


was (Author: pnowojski):
That's a good question [~Zakelly].

Apart of that:

{quote}

jobmangager converts MergedInputChannelStateHandle to InputChannelStateHandle 
collection before assigning state handle, and the rest of the process does not 
need to be changed. 

{quote}

Doesn't this mean we will have a very similar problem during recovery? That 
sending out the RPCs during recovery will take a long time? Wouldn't it be 
better to keep the state handles merged during recovery until they reach their 
destined subtasks on TMs?

> Merge unaligned checkpoint state handle
> ---
>
> Key: FLINK-33734
> URL: https://issues.apache.org/jira/browse/FLINK-33734
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Reporter: Feifan Wang
>Assignee: Feifan Wang
>Priority: Major
>
> h3. Background
> Unaligned checkpoint will write the inflight-data of all InputChannel and 
> ResultSubpartition of the same subtask to the same file during checkpoint. 
> The InputChannelStateHandle and ResultSubpartitionStateHandle organize the 
> metadata of inflight-data at the channel granularity, which causes the file 
> name to be repeated many times. When a job is under backpressure and task 
> parallelism is high, the metadata of unaligned checkpoints will bloat. This 
> will result in:
>  # The amount of data reported by taskmanager to jobmanager increases, and 
> jobmanager takes longer to process these RPC requests.
>  # The metadata of the entire checkpoint becomes very large, and it takes 
> longer to serialize and write it to dfs.
> Both of the above points ultimately lead to longer checkpoint duration.
> h3. A Production example
> Take our production job with a parallelism of 4800 as an example:
>  # When there is no back pressure, checkpoint end-to-end duration is within 7 
> seconds.
>  # When under pressure: checkpoint end-to-end duration often exceeds 1 
> minute. We found that jobmanager took more than 40 seconds to process rpc 
> requests, and serialized metadata took more than 20 seconds.Some checkpoint 
> statistics:
> |metadata file size|950 MB|
> |channel state count|12,229,854|
> |channel file count|5536|
> Of the 950MB in the metadata file, 68% are redundant file paths.
> We enabled log-based checkpoint on this job and hoped that the checkpoint 
> could be completed within 30 seconds. This problem made it difficult to 
> achieve this goal.
> h3. Propose changes
> I suggest introducing MergedInputChannelStateHandle and 
> MergedResultSubpartitionStateHandle to eliminate redundant file paths.
> The taskmanager merges all InputChannelStateHandles with the same delegated 
> StreamStateHandle in the same subtask into one MergedInputChannelStateHandle 
> before reporting. When recovering from checkpoint, jobmangager converts 
> MergedInputChannelStateHandle to InputChannelStateHandle collection before 
> assigning state handle, and the rest of the process does not need to be 
> changed. 
> Structure of MergedInputChannelStateHandle :
>  
> {code:java}
> {   // MergedInputChannelStateHandle
>     "delegate": {
>         "filePath": 
> "viewfs://hadoop-meituan/flink-yg15/checkpoints/retained/1234567/ab8d0c2f02a47586490b15e7a2c30555/chk-31/ffe54c0a-9b6e-4724-aae7-61b96bf8b1cf",
>         "stateSize": 123456
>     },
>     "size": 2000,
>     "subtaskIndex":0,
>     "channels": [ // One InputChannel per element
>         {
>             "info": {
>                 "gateIdx": 0,
>                 "inputChannelIdx": 0
>             },
>             "offsets": [
>                 100,200,300,400
>             ],
>             "size": 1400
>         },
>         {
>             "info": {
>                 "gateIdx": 0,
>                 "inputChannelIdx": 1
>             },
>             "offsets": [
>                 500,600
>             ],
>             "size":

Re: [PR] [FLINK-33559] Externalize Kafka Python connector code [flink-connector-kafka]

2023-12-06 Thread via GitHub


pvary commented on code in PR #69:
URL: 
https://github.com/apache/flink-connector-kafka/pull/69#discussion_r1417327078


##
flink-python/setup.py:
##
@@ -0,0 +1,158 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import print_function
+
+import glob
+import io
+import os
+import sys
+
+from setuptools import setup
+from shutil import copy, rmtree
+from xml.etree import ElementTree as ET
+
+PACKAGE_NAME = 'apache-flink-connectors-kafka'

Review Comment:
   Done



##
flink-python/pom.xml:
##
@@ -0,0 +1,209 @@
+
+
+http://maven.apache.org/POM/4.0.0";
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+4.0.0
+
+
+org.apache.flink
+flink-connector-kafka-parent
+3.1-SNAPSHOT
+
+
+flink-connector-kafka-python
+Flink : Connectors : SQL : Kafka : Python
+
+jar

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Add Java client library update process to the Upgrade page [flink-kubernetes-operator]

2023-12-06 Thread via GitHub


mxm opened a new pull request, #723:
URL: https://github.com/apache/flink-kubernetes-operator/pull/723

   Update the docs regarding Java client library upgrades. I'm curious to hear 
your opinion on this one. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33758] Implement restore tests for TemporalSort node [flink]

2023-12-06 Thread via GitHub


jnh5y commented on code in PR #23879:
URL: https://github.com/apache/flink/pull/23879#discussion_r1417390527


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortTestPrograms.java:
##
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+
+import java.math.BigDecimal;
+
+/** {@link TableTestProgram} definitions for testing {@link 
StreamExecTemporalSort}. */
+public class TemporalSortTestPrograms {
+
+static final Row[] BEFORE_DATA = {
+Row.of("2020-10-10 00:00:01", 1, 1d, 1f, new BigDecimal("1.11"), "Hi", 
"a"),
+Row.of("2020-10-10 00:00:02", 2, 2d, 2f, new BigDecimal("2.22"), 
"Comment#1", "a"),
+Row.of("2020-10-10 00:00:07", 5, 6d, 3f, null, "Hello", "b"),
+Row.of("2020-10-10 00:00:07", 3, 3d, 3f, null, "Hello", "b"),
+// out of order
+Row.of("2020-10-10 00:00:06", 6, 6d, 6f, new BigDecimal("6.66"), "Hi", 
"b"),
+Row.of("2020-10-10 00:00:08", 3, null, 3f, new BigDecimal("3.33"), 
"Comment#2", "a"),
+// late event
+Row.of("2020-10-10 00:00:04", 5, 5d, null, new BigDecimal("5.55"), 
"Hi", "a"),
+Row.of("2020-10-10 00:00:16", 4, 4d, 4f, new BigDecimal("4.44"), "Hi", 
"b")
+};
+
+static final Row[] AFTER_DATA = {
+Row.of("2020-10-10 00:00:40", 10, 3d, 3f, new BigDecimal("4.44"), 
"Comment#4", "a"),
+Row.of("2020-10-10 00:00:42", 11, 4d, 4f, new BigDecimal("5.44"), 
"Comment#5", "d")
+};
+static final TableTestProgram TEMPORAL_SORT_PROCTIME =
+TableTestProgram.of(
+"temporal-sort-proctime", "validates temporal sort 
node with proctime")
+.setupTableSource(
+SourceTestStep.newBuilder("source_t")
+.addSchema(
+"a INT",
+"b BIGINT",
+"c STRING",
+"`proctime` as PROCTIME()")
+.producedBeforeRestore(
+Row.of(1, 1L, "Hi"),
+Row.of(2, 2L, "Hello"),
+Row.of(3, 2L, "Hello world"))
+.producedAfterRestore(
+Row.of(4, 1L, "Guten Morgen"),
+Row.of(5, 2L, "Guten Tag"))
+.build())
+.setupTableSink(
+SinkTestStep.newBuilder("sink_t")
+.addSchema("a INT")
+.consumedBeforeRestore("+I[1]", "+I[2]", 
"+I[3]")
+.consumedAfterRestore("+I[4]", "+I[5]")
+.build())
+.runSql("INSERT INTO sink_t SELECT a from source_t ORDER 
BY proctime")
+.build();
+
+static final TableTestProgram TEMPORAL_SORT_ROWTIME =
+TableTestProgram.of(
+"temporal-sort-rowtime", "validates temporal sort 
node with rowtime")
+.setupTableSource(
+SourceTestStep.newBuilder("source_t")
+.addSchema(
+"ts STRING",
+"`int` INT",
+"`double` DOUBLE",
+"`float` FLOAT",
+"`bigdec` DECIMAL(10, 2)",
+"`string` STRING",
+"`name` STRING",
+

Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]

2023-12-06 Thread via GitHub


reta commented on PR #83:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/83#issuecomment-1842971156

   @schulzp there was a similar change introduced into 
`flink-connector-opensearch`, I believe we could backport it to the 
Elasticsearch connector to have a similar model of configuration and failure 
handling.
   
   [1] https://github.com/apache/flink-connector-opensearch/pull/11


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33708][FLINK-33709] Introduce TraceReporter and use it to create checkpointing traces [flink]

2023-12-06 Thread via GitHub


rkhachatryan commented on code in PR #23845:
URL: https://github.com/apache/flink/pull/23845#discussion_r1417402466


##
docs/content/docs/ops/traces.md:
##
@@ -0,0 +1,126 @@
+---
+title: "Metrics"
+weight: 6
+type: docs
+aliases:
+  - /ops/traces.html
+  - /apis/traces.html
+  - /monitoring/traces.html
+---
+
+
+# Traces
+
+Flink exposes a tracing system that allows gathering and exposing traces to 
external systems.
+
+## Reporting traces
+
+You can access the tracing system from any user function that extends 
[RichFunction]({{< ref "docs/dev/datastream/user_defined_functions" 
>}}#rich-functions) by calling `getRuntimeContext().getMetricGroup()`.
+This method returns a `MetricGroup` object via which you can report a new 
single span trace.
+
+### Reporting single Span
+
+
+A `Span` represents something that happened in Flink at certain point of time, 
that will be reported to a `TraceReporter`.
+To report a `Span` you can use the `MetricGroup#addSpan(SpanBuilder)` method.
+
+Currently we don't support traces with multiple spans. Each `Span` is 
self-contained and represents things like a checkpoint or recovery.
+{{< tabs "9612d275-bdda-4322-a01f-ae6da805e917" >}}
+{{< tab "Java" >}}
+```java
+public class MyClass {
+void doSomething() {
+// (...)
+metricGroup.addSpan(
+Span.builder(MyClass.class, "SomeAction")
+.setStartTsMillis(startTs) // Optional
+.setEndTsMillis(endTs) // Optional
+.setAttribute("foo", "bar");
+}
+}
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+Currently reporting Spans from Python is not supported.
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+## Reporter
+
+For information on how to set up Flink's trace reporters please take a look at 
the [trace reporters documentation]({{< ref "docs/deployment/trace_reporters" 
>}}).
+
+## System traces
+
+Flink reports traces listed below.
+
+The tables below generally feature 5 columns:

Review Comment:
   typo: feature**s** ?



##
docs/content/docs/ops/traces.md:
##
@@ -0,0 +1,126 @@
+---
+title: "Metrics"
+weight: 6
+type: docs
+aliases:
+  - /ops/traces.html
+  - /apis/traces.html
+  - /monitoring/traces.html
+---
+
+
+# Traces
+
+Flink exposes a tracing system that allows gathering and exposing traces to 
external systems.
+
+## Reporting traces
+
+You can access the tracing system from any user function that extends 
[RichFunction]({{< ref "docs/dev/datastream/user_defined_functions" 
>}}#rich-functions) by calling `getRuntimeContext().getMetricGroup()`.
+This method returns a `MetricGroup` object via which you can report a new 
single span trace.
+
+### Reporting single Span
+
+
+A `Span` represents something that happened in Flink at certain point of time, 
that will be reported to a `TraceReporter`.
+To report a `Span` you can use the `MetricGroup#addSpan(SpanBuilder)` method.
+
+Currently we don't support traces with multiple spans. Each `Span` is 
self-contained and represents things like a checkpoint or recovery.
+{{< tabs "9612d275-bdda-4322-a01f-ae6da805e917" >}}
+{{< tab "Java" >}}
+```java
+public class MyClass {
+void doSomething() {
+// (...)
+metricGroup.addSpan(
+Span.builder(MyClass.class, "SomeAction")
+.setStartTsMillis(startTs) // Optional
+.setEndTsMillis(endTs) // Optional
+.setAttribute("foo", "bar");
+}
+}
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+Currently reporting Spans from Python is not supported.
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+## Reporter
+
+For information on how to set up Flink's trace reporters please take a look at 
the [trace reporters documentation]({{< ref "docs/deployment/trace_reporters" 
>}}).
+
+## System traces
+
+Flink reports traces listed below.
+
+The tables below generally feature 5 columns:
+
+* The "Scope" column describes what is that trace reported scope.
+
+* The "Name" column describes the name of the reported trace.
+
+* The "Attributes" column lists the names of all attributes that are reported 
with the given trace.
+
+* The "Description" column provides information as to what a given attribute 
is reporting.
+
+### Checkpointing and recovery
+

Review Comment:
   Should we clarify that the trace is reported only once the checkpoint is 
completed/failed?



##
docs/layouts/shortcodes/generated/trace_configuration.html:
##
@@ -0,0 +1,36 @@
+

Review Comment:
   I couldn't find this section.
   Shouldn't it be added explicitly to `config.md`?



##
docs/content/docs/ops/traces.md:
##
@@ -0,0 +1,126 @@
+---
+title: "Metrics"
+weight: 6
+type: docs
+aliases:
+  - /ops/traces.html
+  - /apis/traces.html
+  - /monitoring/traces.html
+---
+
+
+# Traces
+
+Flink exposes a tracing system that allows gathering and exposing traces to 
external systems.
+
+## Reporting traces
+
+You can access th

Re: [PR] [FLINK-33708][FLINK-33709] Introduce TraceReporter and use it to create checkpointing traces [flink]

2023-12-06 Thread via GitHub


rkhachatryan commented on PR #23845:
URL: https://github.com/apache/flink/pull/23845#issuecomment-1843011650

   One more thing, all the documentation changes only affect the English 
version.
   Should we also update the Chinese version (with the same content in English)?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33757] Implement restore tests for Rank node [flink]

2023-12-06 Thread via GitHub


jnh5y commented on code in PR #23878:
URL: https://github.com/apache/flink/pull/23878#discussion_r1417448524


##
flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-rank_1/rank-test/plan/rank-test.json:
##
@@ -81,19 +52,20 @@
 },
 "orderBy" : {
   "fields" : [ {
-"index" : 0,
+"index" : 2,
 "isAscending" : true,
 "nullIsLast" : false
   } ]
 },
 "rankRange" : {
-  "type" : "Variable",
-  "endIndex" : 0
+  "type" : "Constant",
+  "start" : 1,
+  "end" : 1
 },
 "rankStrategy" : {
   "type" : "AppendFast"
 },
-"outputRowNumber" : true,
+"outputRowNumber" : false,

Review Comment:
   Let's see...  
   
   For RankProcessStrategy, the existing/ported tests only cover AppendFast.  
(There are Undefined, Retract, and UpdateFast strategies.)
   (I'm guessing that `generateUpdateBefore` is coupled with the strategy?)
   
   For RankType, only ROW_NUMBER is covered.  There is RANK, and DENSE_RANK.  
   
   For RankRange, we are testing having 1 and 2 rows...  seems like that is 
some reasonable coverage.  
   
   What is useful here?  Adding coverage for the two (or three, if Undefined is 
possible) `RankProcessStrategy`s and testing the other RankTypes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32895][Scheduler] Introduce the max attempts for Exponential Delay Restart Strategy [flink]

2023-12-06 Thread via GitHub


qinf commented on code in PR #23247:
URL: https://github.com/apache/flink/pull/23247#discussion_r1417423381


##
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategyTest.java:
##
@@ -36,35 +36,37 @@ class ExponentialDelayRestartBackoffTimeStrategyTest {
 private final Exception failure = new Exception();
 
 @Test
-void testAlwaysRestart() throws Exception {
+void testMaxAttempts() {
+int maxAttempts = 13;
 final ExponentialDelayRestartBackoffTimeStrategy restartStrategy =
 new ExponentialDelayRestartBackoffTimeStrategy(
-new ManualClock(), 1L, 3L, 2.0, 4L, 0.25);
+new ManualClock(), 1L, 3L, 2.0, 4L, 0.25, maxAttempts);

Review Comment:
   The default value of `ConfigOption` 
`restart-strategy.exponential-delay.backoff-multiplier` has been set to `1.2`. 
Is it necessary to update backoffMultiplier to `1.2`?



##
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartBackoffTimeStrategyFactoryLoader.java:
##
@@ -116,7 +116,9 @@ private static Optional 
getJobRestartStrateg
 
exponentialDelayConfig.getMaxBackoff().toMilliseconds(),
 exponentialDelayConfig.getBackoffMultiplier(),
 
exponentialDelayConfig.getResetBackoffThreshold().toMilliseconds(),
-exponentialDelayConfig.getJitterFactor()));
+exponentialDelayConfig.getJitterFactor(),
+
RestartStrategyOptions.RESTART_STRATEGY_EXPONENTIAL_DELAY_ATTEMPTS

Review Comment:
   The `ExponentialDelayRestartStrategyConfiguration` contains other parameters 
needed to create `ExponentialDelayRestartBackoffTimeStrategyFactory`. Is it 
necessary to add a parameter `attemptsBeforeResetBackoff` to 
`ExponentialDelayRestartStrategyConfiguration`?



##
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategyTest.java:
##
@@ -73,14 +75,14 @@ void testMaxBackoff() throws Exception {
 }
 
 @Test
-void testResetBackoff() throws Exception {
+void testResetBackoff() {
 final long initialBackoffMS = 1L;
 final long resetBackoffThresholdMS = 8L;
 final ManualClock clock = new ManualClock();
 
 final ExponentialDelayRestartBackoffTimeStrategy restartStrategy =
 new ExponentialDelayRestartBackoffTimeStrategy(
-clock, initialBackoffMS, 5L, 2.0, 
resetBackoffThresholdMS, 0.25);
+clock, initialBackoffMS, 5L, 2.0, 
resetBackoffThresholdMS, 0.25, 100);

Review Comment:
   Could you help explain why set the `attemptsBeforeResetBackoff` to 100 here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]

2023-12-06 Thread via GitHub


schulzp commented on PR #83:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/83#issuecomment-1843046355

   @afedulov, thanks! I added a test that makes sure the inspector is passed to 
the `ElasticsearchWriter`.
   
   @reswqa, I looked into the pipeline failed pipelines: except for one 
(against flink 1.19-SNAPSHOT) all of them have been canceled. The pipeline logs 
do not state why. Apparently some interfaces have been moved/changed in 
1.19-SNAPSHOT so it no longer compiles.
   
   @reta, I checked out that the opensearch failure handler. That achieves the 
same in regard of error handling. However, it does not allow capturing 
additional metrics. At least for us that would not suffice. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]

2023-12-06 Thread via GitHub


reta commented on PR #83:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/83#issuecomment-1843053300

   > @reta, I checked out that the opensearch failure handler. That achieves 
the same in regard of error handling. However, it does not allow capturing 
additional metrics. At least for us that would not suffice.
   
   Thanks @schulzp , I think we could:
- backport the change from Opensearch connector
- enhance it with an ability to capture additional metrics
- port this change to Opensearch and Elasticsearch connectors


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-33763) Support manual savepoint redeploy for jobs and deployments

2023-12-06 Thread Gyula Fora (Jira)
Gyula Fora created FLINK-33763:
--

 Summary: Support manual savepoint redeploy for jobs and deployments
 Key: FLINK-33763
 URL: https://issues.apache.org/jira/browse/FLINK-33763
 Project: Flink
  Issue Type: New Feature
  Components: Kubernetes Operator
Reporter: Gyula Fora


A common request is to support a streamlined, user friendly way of redeploying 
from a target savepoint.

Previously this was only possible by deleting the CR and recreating it with 
initialSavepointPath. A big downside of this approach is a loss of 
savepoint/checkpoint history in the status that some platforms may need, 
resulting in non-cleaned up save points etc.

We suggest to introduce a `savepointRedeployNonce` field in the job spec 
similar to other action trigger nonces.

If the nonce changes to a new non null value the job will be redeployed from 
the path specified in the initialSavepointPath (or empty state If the path is 
empty)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-33763) Support manual savepoint redeploy for jobs and deployments

2023-12-06 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33763?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora reassigned FLINK-33763:
--

Assignee: Gyula Fora

> Support manual savepoint redeploy for jobs and deployments
> --
>
> Key: FLINK-33763
> URL: https://issues.apache.org/jira/browse/FLINK-33763
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Reporter: Gyula Fora
>Assignee: Gyula Fora
>Priority: Major
>
> A common request is to support a streamlined, user friendly way of 
> redeploying from a target savepoint.
> Previously this was only possible by deleting the CR and recreating it with 
> initialSavepointPath. A big downside of this approach is a loss of 
> savepoint/checkpoint history in the status that some platforms may need, 
> resulting in non-cleaned up save points etc.
> We suggest to introduce a `savepointRedeployNonce` field in the job spec 
> similar to other action trigger nonces.
> If the nonce changes to a new non null value the job will be redeployed from 
> the path specified in the initialSavepointPath (or empty state If the path is 
> empty)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33758] Implement restore tests for TemporalSort node [flink]

2023-12-06 Thread via GitHub


dawidwys commented on code in PR #23879:
URL: https://github.com/apache/flink/pull/23879#discussion_r1417474823


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortTestPrograms.java:
##
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+
+import java.math.BigDecimal;
+
+/** {@link TableTestProgram} definitions for testing {@link 
StreamExecTemporalSort}. */
+public class TemporalSortTestPrograms {
+
+static final Row[] BEFORE_DATA = {
+Row.of("2020-10-10 00:00:01", 1, 1d, 1f, new BigDecimal("1.11"), "Hi", 
"a"),
+Row.of("2020-10-10 00:00:02", 2, 2d, 2f, new BigDecimal("2.22"), 
"Comment#1", "a"),
+Row.of("2020-10-10 00:00:07", 5, 6d, 3f, null, "Hello", "b"),
+Row.of("2020-10-10 00:00:07", 3, 3d, 3f, null, "Hello", "b"),
+// out of order
+Row.of("2020-10-10 00:00:06", 6, 6d, 6f, new BigDecimal("6.66"), "Hi", 
"b"),
+Row.of("2020-10-10 00:00:08", 3, null, 3f, new BigDecimal("3.33"), 
"Comment#2", "a"),
+// late event
+Row.of("2020-10-10 00:00:04", 5, 5d, null, new BigDecimal("5.55"), 
"Hi", "a"),
+Row.of("2020-10-10 00:00:16", 4, 4d, 4f, new BigDecimal("4.44"), "Hi", 
"b")
+};
+
+static final Row[] AFTER_DATA = {
+Row.of("2020-10-10 00:00:40", 10, 3d, 3f, new BigDecimal("4.44"), 
"Comment#4", "a"),
+Row.of("2020-10-10 00:00:42", 11, 4d, 4f, new BigDecimal("5.44"), 
"Comment#5", "d")
+};
+static final TableTestProgram TEMPORAL_SORT_PROCTIME =
+TableTestProgram.of(
+"temporal-sort-proctime", "validates temporal sort 
node with proctime")
+.setupTableSource(
+SourceTestStep.newBuilder("source_t")
+.addSchema(
+"a INT",
+"b BIGINT",
+"c STRING",
+"`proctime` as PROCTIME()")
+.producedBeforeRestore(
+Row.of(1, 1L, "Hi"),
+Row.of(2, 2L, "Hello"),
+Row.of(3, 2L, "Hello world"))
+.producedAfterRestore(
+Row.of(4, 1L, "Guten Morgen"),
+Row.of(5, 2L, "Guten Tag"))
+.build())
+.setupTableSink(
+SinkTestStep.newBuilder("sink_t")
+.addSchema("a INT")
+.consumedBeforeRestore("+I[1]", "+I[2]", 
"+I[3]")
+.consumedAfterRestore("+I[4]", "+I[5]")
+.build())
+.runSql("INSERT INTO sink_t SELECT a from source_t ORDER 
BY proctime")
+.build();
+
+static final TableTestProgram TEMPORAL_SORT_ROWTIME =
+TableTestProgram.of(
+"temporal-sort-rowtime", "validates temporal sort 
node with rowtime")
+.setupTableSource(
+SourceTestStep.newBuilder("source_t")
+.addSchema(
+"ts STRING",
+"`int` INT",
+"`double` DOUBLE",
+"`float` FLOAT",
+"`bigdec` DECIMAL(10, 2)",
+"`string` STRING",
+"`name` STRING",
+ 

Re: [PR] [FLINK-33757] Implement restore tests for Rank node [flink]

2023-12-06 Thread via GitHub


dawidwys commented on code in PR #23878:
URL: https://github.com/apache/flink/pull/23878#discussion_r1417478356


##
flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-rank_1/rank-test/plan/rank-test.json:
##
@@ -81,19 +52,20 @@
 },
 "orderBy" : {
   "fields" : [ {
-"index" : 0,
+"index" : 2,
 "isAscending" : true,
 "nullIsLast" : false
   } ]
 },
 "rankRange" : {
-  "type" : "Variable",
-  "endIndex" : 0
+  "type" : "Constant",
+  "start" : 1,
+  "end" : 1
 },
 "rankStrategy" : {
   "type" : "AppendFast"
 },
-"outputRowNumber" : true,
+"outputRowNumber" : false,

Review Comment:
   > What is useful here? Adding coverage for the two (or three, if Undefined 
is possible) RankProcessStrategys and testing the other RankTypes?
   
   This sounds reasonable, yes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-33764) Incorporate GC / Heap metrics in autoscaler decisions

2023-12-06 Thread Gyula Fora (Jira)
Gyula Fora created FLINK-33764:
--

 Summary: Incorporate GC / Heap metrics in autoscaler decisions
 Key: FLINK-33764
 URL: https://issues.apache.org/jira/browse/FLINK-33764
 Project: Flink
  Issue Type: New Feature
  Components: Autoscaler, Kubernetes Operator
Reporter: Gyula Fora
Assignee: Gyula Fora


The autoscaler currently doesn't use any GC/HEAP metrics as part of the scaling 
decisions. 

While the long term goal may be to support vertical scaling (increasing TM 
sizes) currently this is out of scope for the autoscaler.

However it is very important to detect cases where the throughput of certain 
vertices or the entire pipeline is critically affected by long GC pauses. In 
these cases the current autoscaler logic would wrongly assume a low true 
processing rate and scale the pipeline too high, ramping up costs and causing 
further issues.

Using the improved GC metrics introduced in 
https://issues.apache.org/jira/browse/FLINK-33318 we should measure the GC 
pauses and simply block scaling decisions if the pipeline spends too much time 
garbage collecting and notify the user about the required action to increase 
memory.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33734) Merge unaligned checkpoint state handle

2023-12-06 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17793770#comment-17793770
 ] 

Rui Fan commented on FLINK-33734:
-

Thanks [~Feifan Wang]  creating this  Jira, and thanks  [~pnowojski]  for the 
feedback!
{quote}Doesn't this mean we will have a very similar problem during recovery?
{quote}
Yes, you are right.

I have a offline discuss with [~Feifan Wang] in advance. If jobmangager 
converts MergedInputChannelStateHandle to InputChannelStateHandle collection 
before assigning state handle, the whole recovery logic won't be change, so 
this Jira is easy to implement.

Why [~Feifan Wang] and I think the checkpoint duration deserve more attention 
than recovery duration? Because the checkpoint is very frequent than recovery.

Of course, if we want to improve the recovery logic, it's fine for me. As I 
understand, the MergedInputChannelStateHandle can be used directly when the 
parallelism isn't changed. When the rescale happen, channel info should be 
stored with filePath together during recovery.
{quote} jobmangager converts MergedInputChannelStateHandle to 
InputChannelStateHandle collection before assigning state handle
{quote}
Or we can consider it as the stage one? And improve the recovery duration as 
the stage two? Look forward to more feedback from you, thanks~

--

Also,  I have some questions about this proposal:
{quote}Of the 950MB in the metadata file, 68% are redundant file paths.
{quote}
Can we think the _metadata file size will be reduced 68% after this proposal?
{quote}Structure of MergedInputChannelStateHandle :
{quote}
How does flink serialize the MergedInputChannelStateHandle? Does it store the 
field name? If we change the inputChannelIdx to idx, can it reduce the file 
size? IIUC, the idx is very frequent in the metadata.

I'm thinking could we make other optimizations to make 
MergedInputChannelStateHandle simpler while ensuring that 
InputChannelStateHandle can be restored?

> Merge unaligned checkpoint state handle
> ---
>
> Key: FLINK-33734
> URL: https://issues.apache.org/jira/browse/FLINK-33734
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Reporter: Feifan Wang
>Assignee: Feifan Wang
>Priority: Major
>
> h3. Background
> Unaligned checkpoint will write the inflight-data of all InputChannel and 
> ResultSubpartition of the same subtask to the same file during checkpoint. 
> The InputChannelStateHandle and ResultSubpartitionStateHandle organize the 
> metadata of inflight-data at the channel granularity, which causes the file 
> name to be repeated many times. When a job is under backpressure and task 
> parallelism is high, the metadata of unaligned checkpoints will bloat. This 
> will result in:
>  # The amount of data reported by taskmanager to jobmanager increases, and 
> jobmanager takes longer to process these RPC requests.
>  # The metadata of the entire checkpoint becomes very large, and it takes 
> longer to serialize and write it to dfs.
> Both of the above points ultimately lead to longer checkpoint duration.
> h3. A Production example
> Take our production job with a parallelism of 4800 as an example:
>  # When there is no back pressure, checkpoint end-to-end duration is within 7 
> seconds.
>  # When under pressure: checkpoint end-to-end duration often exceeds 1 
> minute. We found that jobmanager took more than 40 seconds to process rpc 
> requests, and serialized metadata took more than 20 seconds.Some checkpoint 
> statistics:
> |metadata file size|950 MB|
> |channel state count|12,229,854|
> |channel file count|5536|
> Of the 950MB in the metadata file, 68% are redundant file paths.
> We enabled log-based checkpoint on this job and hoped that the checkpoint 
> could be completed within 30 seconds. This problem made it difficult to 
> achieve this goal.
> h3. Propose changes
> I suggest introducing MergedInputChannelStateHandle and 
> MergedResultSubpartitionStateHandle to eliminate redundant file paths.
> The taskmanager merges all InputChannelStateHandles with the same delegated 
> StreamStateHandle in the same subtask into one MergedInputChannelStateHandle 
> before reporting. When recovering from checkpoint, jobmangager converts 
> MergedInputChannelStateHandle to InputChannelStateHandle collection before 
> assigning state handle, and the rest of the process does not need to be 
> changed. 
> Structure of MergedInputChannelStateHandle :
>  
> {code:java}
> {   // MergedInputChannelStateHandle
>     "delegate": {
>         "filePath": 
> "viewfs://hadoop-meituan/flink-yg15/checkpoints/retained/1234567/ab8d0c2f02a47586490b15e7a2c30555/chk-31/ffe54

Re: [PR] [FLINK-32895][Scheduler] Introduce the max attempts for Exponential Delay Restart Strategy [flink]

2023-12-06 Thread via GitHub


1996fanrui commented on code in PR #23247:
URL: https://github.com/apache/flink/pull/23247#discussion_r1417489771


##
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartBackoffTimeStrategyFactoryLoader.java:
##
@@ -116,7 +116,9 @@ private static Optional 
getJobRestartStrateg
 
exponentialDelayConfig.getMaxBackoff().toMilliseconds(),
 exponentialDelayConfig.getBackoffMultiplier(),
 
exponentialDelayConfig.getResetBackoffThreshold().toMilliseconds(),
-exponentialDelayConfig.getJitterFactor()));
+exponentialDelayConfig.getJitterFactor(),
+
RestartStrategyOptions.RESTART_STRATEGY_EXPONENTIAL_DELAY_ATTEMPTS

Review Comment:
   I didn't do it because `ExponentialDelayRestartStrategyConfiguration` is 
deprecated since 1.19.  The `RestartStrategies` has been deprecated, and 
`ExponentialDelayRestartStrategyConfiguration` is the inner class of it. 
   
   So the new option won't be supportted in the deprecated api.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32895][Scheduler] Introduce the max attempts for Exponential Delay Restart Strategy [flink]

2023-12-06 Thread via GitHub


1996fanrui commented on code in PR #23247:
URL: https://github.com/apache/flink/pull/23247#discussion_r1417491069


##
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategyTest.java:
##
@@ -73,14 +75,14 @@ void testMaxBackoff() throws Exception {
 }
 
 @Test
-void testResetBackoff() throws Exception {
+void testResetBackoff() {
 final long initialBackoffMS = 1L;
 final long resetBackoffThresholdMS = 8L;
 final ManualClock clock = new ManualClock();
 
 final ExponentialDelayRestartBackoffTimeStrategy restartStrategy =
 new ExponentialDelayRestartBackoffTimeStrategy(
-clock, initialBackoffMS, 5L, 2.0, 
resetBackoffThresholdMS, 0.25);
+clock, initialBackoffMS, 5L, 2.0, 
resetBackoffThresholdMS, 0.25, 100);

Review Comment:
   It doesn't have strong reason, I just set it to a big value to ensure this 
test doesn't reach this limitation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]

2023-12-06 Thread via GitHub


schulzp commented on PR #83:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/83#issuecomment-1843099110

   @reta, sure, so I'll rename my interfaces to match those from opensearch and 
add the factory to the opensearch sink builder  afterwards. I assume there 
shared code to be reused and that is more about API consistency?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]

2023-12-06 Thread via GitHub


reta commented on PR #83:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/83#issuecomment-1843109798

   > I assume there shared code to be reused and that is more about API 
consistency?
   
   There is no shared code (I think you meant that) and indeed, it is more 
about API consistency for both connectors
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33708][FLINK-33709] Introduce TraceReporter and use it to create checkpointing traces [flink]

2023-12-06 Thread via GitHub


pnowojski commented on PR #23845:
URL: https://github.com/apache/flink/pull/23845#issuecomment-1843111206

   > Should we also update the Chinese version (with the same content in 
English)?
   
   Yes, it was my implicit plan to first review the English version and after 👍 
it, copy paste the changes to the Chinese one. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33708][FLINK-33709] Introduce TraceReporter and use it to create checkpointing traces [flink]

2023-12-06 Thread via GitHub


pnowojski commented on code in PR #23845:
URL: https://github.com/apache/flink/pull/23845#discussion_r1417512998


##
docs/content/docs/ops/traces.md:
##
@@ -0,0 +1,126 @@
+---
+title: "Metrics"
+weight: 6
+type: docs
+aliases:
+  - /ops/traces.html
+  - /apis/traces.html
+  - /monitoring/traces.html
+---
+
+
+# Traces
+
+Flink exposes a tracing system that allows gathering and exposing traces to 
external systems.
+
+## Reporting traces
+
+You can access the tracing system from any user function that extends 
[RichFunction]({{< ref "docs/dev/datastream/user_defined_functions" 
>}}#rich-functions) by calling `getRuntimeContext().getMetricGroup()`.
+This method returns a `MetricGroup` object via which you can report a new 
single span trace.
+
+### Reporting single Span
+
+
+A `Span` represents something that happened in Flink at certain point of time, 
that will be reported to a `TraceReporter`.
+To report a `Span` you can use the `MetricGroup#addSpan(SpanBuilder)` method.
+
+Currently we don't support traces with multiple spans. Each `Span` is 
self-contained and represents things like a checkpoint or recovery.
+{{< tabs "9612d275-bdda-4322-a01f-ae6da805e917" >}}
+{{< tab "Java" >}}
+```java
+public class MyClass {
+void doSomething() {
+// (...)
+metricGroup.addSpan(
+Span.builder(MyClass.class, "SomeAction")
+.setStartTsMillis(startTs) // Optional
+.setEndTsMillis(endTs) // Optional
+.setAttribute("foo", "bar");
+}
+}
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+Currently reporting Spans from Python is not supported.
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+## Reporter
+
+For information on how to set up Flink's trace reporters please take a look at 
the [trace reporters documentation]({{< ref "docs/deployment/trace_reporters" 
>}}).
+
+## System traces
+
+Flink reports traces listed below.
+
+The tables below generally feature 5 columns:

Review Comment:
   🤔 The sentence refers to `tables` plural, so `feature` is correct. But atm 
there is only a single table, so I'm not sure which way to go. I would lean 
towards keeping the plural version as it's future proof. WDYT?



##
docs/content/docs/ops/traces.md:
##
@@ -0,0 +1,126 @@
+---
+title: "Metrics"
+weight: 6
+type: docs
+aliases:
+  - /ops/traces.html
+  - /apis/traces.html
+  - /monitoring/traces.html
+---
+
+
+# Traces
+
+Flink exposes a tracing system that allows gathering and exposing traces to 
external systems.
+
+## Reporting traces
+
+You can access the tracing system from any user function that extends 
[RichFunction]({{< ref "docs/dev/datastream/user_defined_functions" 
>}}#rich-functions) by calling `getRuntimeContext().getMetricGroup()`.
+This method returns a `MetricGroup` object via which you can report a new 
single span trace.
+
+### Reporting single Span
+
+
+A `Span` represents something that happened in Flink at certain point of time, 
that will be reported to a `TraceReporter`.
+To report a `Span` you can use the `MetricGroup#addSpan(SpanBuilder)` method.
+
+Currently we don't support traces with multiple spans. Each `Span` is 
self-contained and represents things like a checkpoint or recovery.
+{{< tabs "9612d275-bdda-4322-a01f-ae6da805e917" >}}
+{{< tab "Java" >}}
+```java
+public class MyClass {
+void doSomething() {
+// (...)
+metricGroup.addSpan(
+Span.builder(MyClass.class, "SomeAction")
+.setStartTsMillis(startTs) // Optional
+.setEndTsMillis(endTs) // Optional
+.setAttribute("foo", "bar");
+}
+}
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+Currently reporting Spans from Python is not supported.
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+## Reporter
+
+For information on how to set up Flink's trace reporters please take a look at 
the [trace reporters documentation]({{< ref "docs/deployment/trace_reporters" 
>}}).
+
+## System traces
+
+Flink reports traces listed below.
+
+The tables below generally feature 5 columns:

Review Comment:
   🤔 The sentence refers to `tables` plural, so `feature` is correct. But atm 
there is only a single table, so I'm not sure which way to go. I would lean 
towards keeping the plural version as it's future proof. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33708][FLINK-33709] Introduce TraceReporter and use it to create checkpointing traces [flink]

2023-12-06 Thread via GitHub


pnowojski commented on code in PR #23845:
URL: https://github.com/apache/flink/pull/23845#discussion_r1417516719


##
docs/content/docs/ops/traces.md:
##
@@ -0,0 +1,126 @@
+---
+title: "Metrics"
+weight: 6
+type: docs
+aliases:
+  - /ops/traces.html
+  - /apis/traces.html
+  - /monitoring/traces.html
+---
+
+
+# Traces
+
+Flink exposes a tracing system that allows gathering and exposing traces to 
external systems.
+
+## Reporting traces
+
+You can access the tracing system from any user function that extends 
[RichFunction]({{< ref "docs/dev/datastream/user_defined_functions" 
>}}#rich-functions) by calling `getRuntimeContext().getMetricGroup()`.
+This method returns a `MetricGroup` object via which you can report a new 
single span trace.
+
+### Reporting single Span
+
+
+A `Span` represents something that happened in Flink at certain point of time, 
that will be reported to a `TraceReporter`.
+To report a `Span` you can use the `MetricGroup#addSpan(SpanBuilder)` method.
+
+Currently we don't support traces with multiple spans. Each `Span` is 
self-contained and represents things like a checkpoint or recovery.
+{{< tabs "9612d275-bdda-4322-a01f-ae6da805e917" >}}
+{{< tab "Java" >}}
+```java
+public class MyClass {
+void doSomething() {
+// (...)
+metricGroup.addSpan(
+Span.builder(MyClass.class, "SomeAction")
+.setStartTsMillis(startTs) // Optional
+.setEndTsMillis(endTs) // Optional
+.setAttribute("foo", "bar");
+}
+}
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+Currently reporting Spans from Python is not supported.
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+## Reporter
+
+For information on how to set up Flink's trace reporters please take a look at 
the [trace reporters documentation]({{< ref "docs/deployment/trace_reporters" 
>}}).
+
+## System traces
+
+Flink reports traces listed below.
+
+The tables below generally feature 5 columns:
+
+* The "Scope" column describes what is that trace reported scope.
+
+* The "Name" column describes the name of the reported trace.
+
+* The "Attributes" column lists the names of all attributes that are reported 
with the given trace.
+
+* The "Description" column provides information as to what a given attribute 
is reporting.
+
+### Checkpointing and recovery
+

Review Comment:
   Please check if the wording is fine by you



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]

2023-12-06 Thread via GitHub


afedulov commented on PR #83:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/83#issuecomment-1843121034

   @schulzp thanks! 
   We try to avoid Mockito usage, unless it is not possible because because of 
external dependencies that required concrete classes rather than interfaces. 
Generally speaking, resorting to Mockito is discouraged.
   
https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#avoid-mockito---use-reusable-test-implementations
   In this case it is not necessary, you can just create mock test 
implementations as private static classes within the test.
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33708][FLINK-33709] Introduce TraceReporter and use it to create checkpointing traces [flink]

2023-12-06 Thread via GitHub


pnowojski commented on code in PR #23845:
URL: https://github.com/apache/flink/pull/23845#discussion_r1417523981


##
docs/layouts/shortcodes/generated/akka_configuration.html:
##
@@ -98,6 +80,24 @@
 Duration
 Timeout used for the lookup of the JobManager. The timeout 
value has to contain a time-unit specifier (ms/s/min/h/d).
 
+
+
pekko.remote-fork-join-executor.parallelism-factor
+2.0
+Double
+The parallelism factor is used to determine thread pool size 
using the following formula: ceil(available processors * factor). Resulting 
size is then bounded by the parallelism-min and parallelism-max values.
+
+
+pekko.remote-fork-join-executor.parallelism-max
+16
+Integer
+Max number of threads to cap factor-based parallelism number 
to.
+
+
+pekko.remote-fork-join-executor.parallelism-min
+8
+Integer
+Min number of threads to cap factor-based parallelism number 
to.
+

Review Comment:
   This is an automatically generated file/update. It seems like someone hasn't 
updated the docs before me, and this change popped up without my control :(



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-33754] Serialize QueryOperations into SQL string [flink]

2023-12-06 Thread via GitHub


dawidwys opened a new pull request, #23884:
URL: https://github.com/apache/flink/pull/23884

   
   ## What is the purpose of the change
   
   Serialize `QueryOperations` into its SQL equivalents.
   
   ## Verifying this change
   
   Added tests in `QueryOperationSqlSerializationTest`
   
   Additionally, I open this as a draft to discuss if and how to extend the 
test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't 
know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33754) Serialize QueryOperations into SQL

2023-12-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33754?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33754:
---
Labels: pull-request-available  (was: )

> Serialize QueryOperations into SQL
> --
>
> Key: FLINK-33754
> URL: https://issues.apache.org/jira/browse/FLINK-33754
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-33753) ContinuousFileReaderOperator consume records as mini batch

2023-12-06 Thread Prabhu Joseph (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Prabhu Joseph closed FLINK-33753.
-
Resolution: Not A Problem

> ContinuousFileReaderOperator consume records as mini batch
> --
>
> Key: FLINK-33753
> URL: https://issues.apache.org/jira/browse/FLINK-33753
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.16.0
>Reporter: Prabhu Joseph
>Priority: Major
>
> The ContinuousFileReaderOperator reads and collects the records from a split 
> in a loop. If the split size is large, then the loop will take more time, and 
> then the mailbox executor won't have a chance to process the checkpoint 
> barrier. This leads to checkpoint timing out. ContinuousFileReaderOperator 
> could be improved to consume the records in a mini batch, similar to Hudi's 
> StreamReadOperator (https://issues.apache.org/jira/browse/HUDI-2485).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [Flink 31966] Flink Kubernetes operator lacks TLS support [flink-kubernetes-operator]

2023-12-06 Thread via GitHub


tagarr commented on PR #712:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/712#issuecomment-1843133539

   @gyfora @gaborgsomogyi made required changes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33599] Run restore tests with RocksDB state backend [flink]

2023-12-06 Thread via GitHub


dawidwys merged PR #23883:
URL: https://github.com/apache/flink/pull/23883


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-33599) Run restore tests with RocksDB state backend

2023-12-06 Thread Dawid Wysakowicz (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz closed FLINK-33599.

Resolution: Implemented

Implemented in 43fec308b3298ed2aad639b94140c9a2173c10cd

> Run restore tests with RocksDB state backend
> 
>
> Key: FLINK-33599
> URL: https://issues.apache.org/jira/browse/FLINK-33599
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]

2023-12-06 Thread via GitHub


schulzp commented on PR #83:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/83#issuecomment-1843137406

   @afedulov, sure, I'll implement it with plain java. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33754] Serialize QueryOperations into SQL string [flink]

2023-12-06 Thread via GitHub


flinkbot commented on PR #23884:
URL: https://github.com/apache/flink/pull/23884#issuecomment-1843138896

   
   ## CI report:
   
   * 33d967570f5b86867dc026358dbff280e09da779 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33758] Implement restore tests for TemporalSort node [flink]

2023-12-06 Thread via GitHub


jnh5y commented on code in PR #23879:
URL: https://github.com/apache/flink/pull/23879#discussion_r1417554937


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortTestPrograms.java:
##
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+
+import java.math.BigDecimal;
+
+/** {@link TableTestProgram} definitions for testing {@link 
StreamExecTemporalSort}. */
+public class TemporalSortTestPrograms {
+
+static final Row[] BEFORE_DATA = {
+Row.of("2020-10-10 00:00:01", 1, 1d, 1f, new BigDecimal("1.11"), "Hi", 
"a"),
+Row.of("2020-10-10 00:00:02", 2, 2d, 2f, new BigDecimal("2.22"), 
"Comment#1", "a"),
+Row.of("2020-10-10 00:00:07", 5, 6d, 3f, null, "Hello", "b"),
+Row.of("2020-10-10 00:00:07", 3, 3d, 3f, null, "Hello", "b"),
+// out of order
+Row.of("2020-10-10 00:00:06", 6, 6d, 6f, new BigDecimal("6.66"), "Hi", 
"b"),
+Row.of("2020-10-10 00:00:08", 3, null, 3f, new BigDecimal("3.33"), 
"Comment#2", "a"),
+// late event
+Row.of("2020-10-10 00:00:04", 5, 5d, null, new BigDecimal("5.55"), 
"Hi", "a"),
+Row.of("2020-10-10 00:00:16", 4, 4d, 4f, new BigDecimal("4.44"), "Hi", 
"b")
+};
+
+static final Row[] AFTER_DATA = {
+Row.of("2020-10-10 00:00:40", 10, 3d, 3f, new BigDecimal("4.44"), 
"Comment#4", "a"),
+Row.of("2020-10-10 00:00:42", 11, 4d, 4f, new BigDecimal("5.44"), 
"Comment#5", "d")
+};
+static final TableTestProgram TEMPORAL_SORT_PROCTIME =
+TableTestProgram.of(
+"temporal-sort-proctime", "validates temporal sort 
node with proctime")
+.setupTableSource(
+SourceTestStep.newBuilder("source_t")
+.addSchema(
+"a INT",
+"b BIGINT",
+"c STRING",
+"`proctime` as PROCTIME()")
+.producedBeforeRestore(
+Row.of(1, 1L, "Hi"),
+Row.of(2, 2L, "Hello"),
+Row.of(3, 2L, "Hello world"))
+.producedAfterRestore(
+Row.of(4, 1L, "Guten Morgen"),
+Row.of(5, 2L, "Guten Tag"))
+.build())
+.setupTableSink(
+SinkTestStep.newBuilder("sink_t")
+.addSchema("a INT")
+.consumedBeforeRestore("+I[1]", "+I[2]", 
"+I[3]")
+.consumedAfterRestore("+I[4]", "+I[5]")
+.build())
+.runSql("INSERT INTO sink_t SELECT a from source_t ORDER 
BY proctime")
+.build();
+
+static final TableTestProgram TEMPORAL_SORT_ROWTIME =
+TableTestProgram.of(
+"temporal-sort-rowtime", "validates temporal sort 
node with rowtime")
+.setupTableSource(
+SourceTestStep.newBuilder("source_t")
+.addSchema(
+"ts STRING",
+"`int` INT",
+"`double` DOUBLE",
+"`float` FLOAT",
+"`bigdec` DECIMAL(10, 2)",
+"`string` STRING",
+"`name` STRING",
+

Re: [PR] [FLINK-33710] Prevent triggering cluster upgrades for permutations of the same overrides [flink-kubernetes-operator]

2023-12-06 Thread via GitHub


mxm merged PR #721:
URL: https://github.com/apache/flink-kubernetes-operator/pull/721


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33710) Autoscaler redeploys pipeline for a NOOP parallelism change

2023-12-06 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17793800#comment-17793800
 ] 

Maximilian Michels commented on FLINK-33710:


Additional fix via ca1d8472d1a1e817268950dae079592581fa5b8f to prevent any 
existing deployments to get affected.

> Autoscaler redeploys pipeline for a NOOP parallelism change
> ---
>
> Key: FLINK-33710
> URL: https://issues.apache.org/jira/browse/FLINK-33710
> Project: Flink
>  Issue Type: Bug
>  Components: Autoscaler, Kubernetes Operator
>Affects Versions: kubernetes-operator-1.6.0, kubernetes-operator-1.7.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> The operator supports two modes to apply autoscaler changes:
> # Use the internal Flink config {{pipeline.jobvertex-parallelism-overrides}} 
> # Make use of Flink's Rescale API 
> For (1), a string has to be generated for the Flink config with the actual 
> overrides. This string has to be deterministic for a given map. But it is not.
> Consider the following observed log:
> {noformat}
>   >>> Event  | Info| SPECCHANGED | SCALE change(s) detected (Diff: 
> FlinkDeploymentSpec[flinkConfiguration.pipeline.jobvertex-parallelism-overrides
>  : 
> 92542d1280187bd464274368a5f86977:3,9f979ed859083299d29f281832cb5be0:1,84881d7bda0dc3d44026e37403420039:1,1652184ffd0522859c7840a24936847c:1
>  -> 
> 9f979ed859083299d29f281832cb5be0:1,84881d7bda0dc3d44026e37403420039:1,92542d1280187bd464274368a5f86977:3,1652184ffd0522859c7840a24936847c:1]),
>  starting reconciliation. 
> {noformat}
> The overrides are identical but the order is different which triggers a 
> redeploy. This does not seem to happen often but some deterministic string 
> generation (e.g. sorting by key) is required to prevent any NOOP updates.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33758] Implement restore tests for TemporalSort node [flink]

2023-12-06 Thread via GitHub


dawidwys commented on code in PR #23879:
URL: https://github.com/apache/flink/pull/23879#discussion_r1417576522


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortTestPrograms.java:
##
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+
+import java.math.BigDecimal;
+
+/** {@link TableTestProgram} definitions for testing {@link 
StreamExecTemporalSort}. */
+public class TemporalSortTestPrograms {
+
+static final Row[] BEFORE_DATA = {
+Row.of("2020-10-10 00:00:01", 1, 1d, 1f, new BigDecimal("1.11"), "Hi", 
"a"),
+Row.of("2020-10-10 00:00:02", 2, 2d, 2f, new BigDecimal("2.22"), 
"Comment#1", "a"),
+Row.of("2020-10-10 00:00:07", 5, 6d, 3f, null, "Hello", "b"),
+Row.of("2020-10-10 00:00:07", 3, 3d, 3f, null, "Hello", "b"),
+// out of order
+Row.of("2020-10-10 00:00:06", 6, 6d, 6f, new BigDecimal("6.66"), "Hi", 
"b"),
+Row.of("2020-10-10 00:00:08", 3, null, 3f, new BigDecimal("3.33"), 
"Comment#2", "a"),
+// late event
+Row.of("2020-10-10 00:00:04", 5, 5d, null, new BigDecimal("5.55"), 
"Hi", "a"),
+Row.of("2020-10-10 00:00:16", 4, 4d, 4f, new BigDecimal("4.44"), "Hi", 
"b")
+};
+
+static final Row[] AFTER_DATA = {
+Row.of("2020-10-10 00:00:40", 10, 3d, 3f, new BigDecimal("4.44"), 
"Comment#4", "a"),
+Row.of("2020-10-10 00:00:42", 11, 4d, 4f, new BigDecimal("5.44"), 
"Comment#5", "d")
+};
+static final TableTestProgram TEMPORAL_SORT_PROCTIME =
+TableTestProgram.of(
+"temporal-sort-proctime", "validates temporal sort 
node with proctime")
+.setupTableSource(
+SourceTestStep.newBuilder("source_t")
+.addSchema(
+"a INT",
+"b BIGINT",
+"c STRING",
+"`proctime` as PROCTIME()")
+.producedBeforeRestore(
+Row.of(1, 1L, "Hi"),
+Row.of(2, 2L, "Hello"),
+Row.of(3, 2L, "Hello world"))
+.producedAfterRestore(
+Row.of(4, 1L, "Guten Morgen"),
+Row.of(5, 2L, "Guten Tag"))
+.build())
+.setupTableSink(
+SinkTestStep.newBuilder("sink_t")
+.addSchema("a INT")
+.consumedBeforeRestore("+I[1]", "+I[2]", 
"+I[3]")
+.consumedAfterRestore("+I[4]", "+I[5]")
+.build())
+.runSql("INSERT INTO sink_t SELECT a from source_t ORDER 
BY proctime")
+.build();
+
+static final TableTestProgram TEMPORAL_SORT_ROWTIME =
+TableTestProgram.of(
+"temporal-sort-rowtime", "validates temporal sort 
node with rowtime")
+.setupTableSource(
+SourceTestStep.newBuilder("source_t")
+.addSchema(
+"ts STRING",
+"`int` INT",
+"`double` DOUBLE",
+"`float` FLOAT",
+"`bigdec` DECIMAL(10, 2)",
+"`string` STRING",
+"`name` STRING",
+ 

Re: [PR] [FLINK-33758] Implement restore tests for TemporalSort node [flink]

2023-12-06 Thread via GitHub


jnh5y commented on code in PR #23879:
URL: https://github.com/apache/flink/pull/23879#discussion_r1417587636


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalSortTestPrograms.java:
##
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+
+import java.math.BigDecimal;
+
+/** {@link TableTestProgram} definitions for testing {@link 
StreamExecTemporalSort}. */
+public class TemporalSortTestPrograms {
+
+static final Row[] BEFORE_DATA = {
+Row.of("2020-10-10 00:00:01", 1, 1d, 1f, new BigDecimal("1.11"), "Hi", 
"a"),
+Row.of("2020-10-10 00:00:02", 2, 2d, 2f, new BigDecimal("2.22"), 
"Comment#1", "a"),
+Row.of("2020-10-10 00:00:07", 5, 6d, 3f, null, "Hello", "b"),
+Row.of("2020-10-10 00:00:07", 3, 3d, 3f, null, "Hello", "b"),
+// out of order
+Row.of("2020-10-10 00:00:06", 6, 6d, 6f, new BigDecimal("6.66"), "Hi", 
"b"),
+Row.of("2020-10-10 00:00:08", 3, null, 3f, new BigDecimal("3.33"), 
"Comment#2", "a"),
+// late event
+Row.of("2020-10-10 00:00:04", 5, 5d, null, new BigDecimal("5.55"), 
"Hi", "a"),
+Row.of("2020-10-10 00:00:16", 4, 4d, 4f, new BigDecimal("4.44"), "Hi", 
"b")
+};
+
+static final Row[] AFTER_DATA = {
+Row.of("2020-10-10 00:00:40", 10, 3d, 3f, new BigDecimal("4.44"), 
"Comment#4", "a"),
+Row.of("2020-10-10 00:00:42", 11, 4d, 4f, new BigDecimal("5.44"), 
"Comment#5", "d")
+};
+static final TableTestProgram TEMPORAL_SORT_PROCTIME =
+TableTestProgram.of(
+"temporal-sort-proctime", "validates temporal sort 
node with proctime")
+.setupTableSource(
+SourceTestStep.newBuilder("source_t")
+.addSchema(
+"a INT",
+"b BIGINT",
+"c STRING",
+"`proctime` as PROCTIME()")
+.producedBeforeRestore(
+Row.of(1, 1L, "Hi"),
+Row.of(2, 2L, "Hello"),
+Row.of(3, 2L, "Hello world"))
+.producedAfterRestore(
+Row.of(4, 1L, "Guten Morgen"),
+Row.of(5, 2L, "Guten Tag"))
+.build())
+.setupTableSink(
+SinkTestStep.newBuilder("sink_t")
+.addSchema("a INT")
+.consumedBeforeRestore("+I[1]", "+I[2]", 
"+I[3]")
+.consumedAfterRestore("+I[4]", "+I[5]")
+.build())
+.runSql("INSERT INTO sink_t SELECT a from source_t ORDER 
BY proctime")
+.build();
+
+static final TableTestProgram TEMPORAL_SORT_ROWTIME =
+TableTestProgram.of(
+"temporal-sort-rowtime", "validates temporal sort 
node with rowtime")
+.setupTableSource(
+SourceTestStep.newBuilder("source_t")
+.addSchema(
+"ts STRING",
+"`int` INT",
+"`double` DOUBLE",
+"`float` FLOAT",
+"`bigdec` DECIMAL(10, 2)",
+"`string` STRING",
+"`name` STRING",
+

[jira] [Updated] (FLINK-33763) Support manual savepoint redeploy for jobs and deployments

2023-12-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33763?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33763:
---
Labels: pull-request-available  (was: )

> Support manual savepoint redeploy for jobs and deployments
> --
>
> Key: FLINK-33763
> URL: https://issues.apache.org/jira/browse/FLINK-33763
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Reporter: Gyula Fora
>Assignee: Gyula Fora
>Priority: Major
>  Labels: pull-request-available
>
> A common request is to support a streamlined, user friendly way of 
> redeploying from a target savepoint.
> Previously this was only possible by deleting the CR and recreating it with 
> initialSavepointPath. A big downside of this approach is a loss of 
> savepoint/checkpoint history in the status that some platforms may need, 
> resulting in non-cleaned up save points etc.
> We suggest to introduce a `savepointRedeployNonce` field in the job spec 
> similar to other action trigger nonces.
> If the nonce changes to a new non null value the job will be redeployed from 
> the path specified in the initialSavepointPath (or empty state If the path is 
> empty)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-33763] Support savepoint redeployment through a nonce [flink-kubernetes-operator]

2023-12-06 Thread via GitHub


gyfora opened a new pull request, #724:
URL: https://github.com/apache/flink-kubernetes-operator/pull/724

   ## What is the purpose of the change
   
   A common request is to support a streamlined, user friendly way of 
redeploying from a target savepoint.
   Previously this was only possible by deleting the CR and recreating it with 
initialSavepointPath. A big downside of this approach is a loss of 
savepoint/checkpoint history in the status that some platforms may need, 
resulting in non-cleaned up save points etc.
   
   We  introduce a `savepointRedeployNonce` field in the job spec similar to 
other action trigger nonces.
   If the nonce changes to a new non null value the job will be redeployed from 
the path specified in the initialSavepointPath (or empty state If the path is 
empty)
   
   As redeployment requires the deletion of HA metadata and previous checkpoint 
info, rollbacks are not supported after redeployments (changing the nonce)
   
   The PR also introduces an improvement to the restartNonce and 
savepointRedeployNonce to ignore null changes (when the user nulls out the 
nonce). This is more practical and inline with the behaviour of the 
savepoint/checkpoint triggering logic
   
   ## Brief change log
   
 - *Add savepointRedeployNonce to `spec.job`*
 - *Implement redeployment logic*
 - *Tests*
 - *Doc updates* [TODO] 
   
   ## Verifying this change
   
   Unit tests and manual validation on local and remote envs.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changes to the `CustomResourceDescriptors`: 
yes
 - Core observer or reconciler logic that is regularly executed: yes
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? docs [TODO] 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33645] Taskmanager env vars in config not given to taskmanager [flink-kubernetes-operator]

2023-12-06 Thread via GitHub


tagarr commented on PR #722:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/722#issuecomment-1843281986

   Fixed checkstyle errors (apologies, could have sworn a ran a build before 
creating the PR)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling [flink-connector-elasticsearch]

2023-12-06 Thread via GitHub


schulzp commented on PR #83:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/83#issuecomment-1843307919

   @afedulov, fixed the test (and extended it)
   
   @reta, I added a `FailureHandler` with a default implementation that 
resembles the current fail-on-any-failure behavior.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33718) Cleanup the usage of deprecated StreamTableEnvironment#toAppendStream

2023-12-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33718:
---
Labels: pull-request-available  (was: )

> Cleanup the usage of deprecated StreamTableEnvironment#toAppendStream
> -
>
> Key: FLINK-33718
> URL: https://issues.apache.org/jira/browse/FLINK-33718
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Jane Chan
>Assignee: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-33718][table] Cleanup the usage of deprecated StreamTableEnvironment#toAppendStream [flink]

2023-12-06 Thread via GitHub


snuyanzin opened a new pull request, #23885:
URL: https://github.com/apache/flink/pull/23885

   
   ## What is the purpose of the change
   
   The PR is about to replace deprecated 
`StreamTableEnvironment#toAppendStream` with 
`StreamTableEnvironment#toDataStream` as mentioned in javadoc
   
   ## Brief change log
   
   tests in table-planner
   
   
   ## Verifying this change
   
   
   This change is already covered by existing tests
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): ( no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: ( no)
 - The serializers: ( no)
 - The runtime per-record code paths (performance sensitive): ( no )
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: ( no )
 - The S3 file system connector: ( no )
   
   ## Documentation
   
 - Does this pull request introduce a new feature? ( no)
 - If yes, how is the feature documented? (not applicable))
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-33718) Cleanup the usage of deprecated StreamTableEnvironment#toAppendStream

2023-12-06 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin reassigned FLINK-33718:
---

Assignee: Sergey Nuyanzin

> Cleanup the usage of deprecated StreamTableEnvironment#toAppendStream
> -
>
> Key: FLINK-33718
> URL: https://issues.apache.org/jira/browse/FLINK-33718
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Jane Chan
>Assignee: Sergey Nuyanzin
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33718][table] Cleanup the usage of deprecated StreamTableEnvironment#toAppendStream [flink]

2023-12-06 Thread via GitHub


flinkbot commented on PR #23885:
URL: https://github.com/apache/flink/pull/23885#issuecomment-1843366916

   
   ## CI report:
   
   * 19810b57dd41666d22a91399d41be0bc0c4794d6 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33545) KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching

2023-12-06 Thread Kevin Tseng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17793866#comment-17793866
 ] 

Kevin Tseng commented on FLINK-33545:
-

Hi Mason,

Kafka Producer only throw exception in following situation:
 # broker connection failure (either by broker or intermediary means such as 
firewall timeout)
 # timeout took place (delivery or request timeout)
 # record issue that violates producer setting (request size limit etc)

anything other than these producer will keep retrying (as default of retry is 
Integer.MAX_VALUE) and will not throw error

however a flush should block until any of the above 3 condition took place.

this can be verified using the snippet i included above in the description of 
the issue.

that being said, due to the nature of KafkaProducer and stability of it, I had 
to assume it worked as intended.

if a task is truly single thread, including committing / barrier handling, 
there shouldn't be any data loss. But that's not what was observed.

the records that were lost generally aren't large in volume, only resembles one 
or two producer thread failure. And in the case of AT_LEAST_ONCE this shouldn't 
happen.

Given that, I think it might be safer to put commit / flush to the end of the 
commit cycle, instead of before actual checkpoint is taking place.

> KafkaSink implementation can cause dataloss during broker issue when not 
> using EXACTLY_ONCE if there's any batching
> ---
>
> Key: FLINK-33545
> URL: https://issues.apache.org/jira/browse/FLINK-33545
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0
>Reporter: Kevin Tseng
>Assignee: Kevin Tseng
>Priority: Major
>  Labels: pull-request-available
>
> In the current implementation of KafkaSource and KafkaSink there are some 
> assumption that were made:
>  # KafkaSource completely relies on Checkpoint to manage and track its offset 
> in *KafkaSourceReader* class
>  # KafkaSink in *KafkaWriter* class only performs catch-flush when 
> *DeliveryGuarantee.EXACTLY_ONCE* is specified.
> KafkaSource is assuming that checkpoint should be properly fenced and 
> everything it had read up-til checkpoint being initiated will be processed or 
> recorded by operators downstream, including the TwoPhaseCommiter such as 
> *KafkaSink*
> *KafkaSink* goes by the model of:
>  
> {code:java}
> flush -> prepareCommit -> commit{code}
>  
> In a scenario that:
>  * KafkaSource ingested records #1 to #100
>  * KafkaSink only had chance to send records #1 to #96
>  * with a batching interval of 5ms
> when checkpoint has been initiated, flush will only confirm the sending of 
> record #1 to #96.
> This allows checkpoint to proceed as there's no error, and record #97 to 100 
> will be batched after first flush.
> Now, if broker goes down / has issue that caused the internal KafkaProducer 
> to not be able to send out the record after a batch, and is on a constant 
> retry-cycle (default value of KafkaProducer retries is Integer.MAX_VALUE), 
> *WriterCallback* error handling will never be triggered until the next 
> checkpoint flush.
> This can be tested by creating a faulty Kafka cluster and run the following 
> code:
> {code:java}
> Properties props = new Properties(); 
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
> props.put(ProducerConfig.CLIENT_ID_CONFIG, "example-producer");
> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName()); 
> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.ACKS_CONFIG, "all"); 
> final KafkaProducer producer = new KafkaProducer<>(props);
> try {
> for (int i = 0; i < 10; i++) {
> System.out.printf("sending record #%d\n", i);
> String data = UUID.randomUUID().toString();
> final ProducerRecord record = new 
> ProducerRecord<>(TOPIC, Integer.toString(i), data);
> producer.send(record, new CB(Integer.toString(i), data));
> Thread.sleep(1); //sleep for 10 seconds
> }
> } catch (Exception e) {
> e.printStackTrace();
> } finally {
> System.out.println("flushing");
> producer.flush();
> System.out.println("closing");
> producer.close();
> }{code}
> Once callback returns due to network timeout, it will cause Flink to restart 
> from previously saved checkpoint (which recorded reading up to record #100), 
> but KafkaWriter never sent record #97 to #100.
> This will result in dataloss of record #97 to #100
> Because 

Re: [PR] [FLINK-33757] Implement restore tests for Rank node [flink]

2023-12-06 Thread via GitHub


jnh5y commented on code in PR #23878:
URL: https://github.com/apache/flink/pull/23878#discussion_r1417894843


##
flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-rank_1/rank-test/plan/rank-test.json:
##
@@ -81,19 +52,20 @@
 },
 "orderBy" : {
   "fields" : [ {
-"index" : 0,
+"index" : 2,
 "isAscending" : true,
 "nullIsLast" : false
   } ]
 },
 "rankRange" : {
-  "type" : "Variable",
-  "endIndex" : 0
+  "type" : "Constant",
+  "start" : 1,
+  "end" : 1
 },
 "rankStrategy" : {
   "type" : "AppendFast"
 },
-"outputRowNumber" : true,
+"outputRowNumber" : false,

Review Comment:
   As an update, RANK, and DENSE_RANK are not supported for streaming; I missed 
that when reading initially.  
   
   The "Undefined" strategy is set before an actual strategy has been assigned. 
 I have added tests for the other two.
   
   Let me know if you find anything else to cover and/or have any naming 
suggestions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33757] Implement restore tests for Rank node [flink]

2023-12-06 Thread via GitHub


jnh5y commented on PR #23878:
URL: https://github.com/apache/flink/pull/23878#issuecomment-1843578408

   Lemme know if you want me to clean up any of the git history.  The merge 
from master was resolving conflicts in the GitHub UI.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-33765) Flink SQL to support COLLECTLIST

2023-12-06 Thread Zhenzhong Xu (Jira)
Zhenzhong Xu created FLINK-33765:


 Summary: Flink SQL to support COLLECTLIST
 Key: FLINK-33765
 URL: https://issues.apache.org/jira/browse/FLINK-33765
 Project: Flink
  Issue Type: Improvement
  Components: API / DataSet
Reporter: Zhenzhong Xu


Flink SQL currently supports COLLECT, which returns a multiset, however, given 
support for casting from multiset to other types (especially array/list) is 
*very* limited, see 
[here,|https://github.com/apache/flink/blob/master/docs/content/docs/dev/table/types.md#casting]
 this is creating lots of headaches for ease of use.

Can we support COLLECT_LIST as a built-in system function?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33765) Flink SQL to support COLLECT_LIST

2023-12-06 Thread Zhenzhong Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33765?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenzhong Xu updated FLINK-33765:
-
Description: 
Flink SQL currently supports COLLECT, which returns a multiset, however, given 
support for casting from multiset to other types (especially array/list) is 
*very* limited, see 
[here,|https://github.com/apache/flink/blob/master/docs/content/docs/dev/table/types.md#casting]
 this is creating lots of headaches for ease of use.

Can we support COLLECT_LIST as a built-in system function? (I believe Spark 
supports it)

  was:
Flink SQL currently supports COLLECT, which returns a multiset, however, given 
support for casting from multiset to other types (especially array/list) is 
*very* limited, see 
[here,|https://github.com/apache/flink/blob/master/docs/content/docs/dev/table/types.md#casting]
 this is creating lots of headaches for ease of use.

Can we support COLLECT_LIST as a built-in system function?


> Flink SQL to support COLLECT_LIST
> -
>
> Key: FLINK-33765
> URL: https://issues.apache.org/jira/browse/FLINK-33765
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataSet
>Reporter: Zhenzhong Xu
>Priority: Major
>
> Flink SQL currently supports COLLECT, which returns a multiset, however, 
> given support for casting from multiset to other types (especially 
> array/list) is *very* limited, see 
> [here,|https://github.com/apache/flink/blob/master/docs/content/docs/dev/table/types.md#casting]
>  this is creating lots of headaches for ease of use.
> Can we support COLLECT_LIST as a built-in system function? (I believe Spark 
> supports it)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33765) Flink SQL to support COLLECT_LIST

2023-12-06 Thread Zhenzhong Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33765?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenzhong Xu updated FLINK-33765:
-
Summary: Flink SQL to support COLLECT_LIST  (was: Flink SQL to support 
COLLECTLIST)

> Flink SQL to support COLLECT_LIST
> -
>
> Key: FLINK-33765
> URL: https://issues.apache.org/jira/browse/FLINK-33765
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataSet
>Reporter: Zhenzhong Xu
>Priority: Major
>
> Flink SQL currently supports COLLECT, which returns a multiset, however, 
> given support for casting from multiset to other types (especially 
> array/list) is *very* limited, see 
> [here,|https://github.com/apache/flink/blob/master/docs/content/docs/dev/table/types.md#casting]
>  this is creating lots of headaches for ease of use.
> Can we support COLLECT_LIST as a built-in system function?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33718][table] Cleanup the usage of deprecated StreamTableEnvironment#toAppendStream [flink]

2023-12-06 Thread via GitHub


JingGe commented on code in PR #23885:
URL: https://github.com/apache/flink/pull/23885#discussion_r1417905005


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/EnvironmentTest.java:
##
@@ -66,7 +66,7 @@ void testPassingExecutionParameters() {
 
 // trigger translation
 Table table = tEnv.sqlQuery("SELECT * FROM test");
-tEnv.toAppendStream(table, Row.class);
+tEnv.toDataStream(table, Row.class);

Review Comment:
   NIT: `tEnv.toDataStream(table)` should work too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33726][sql-client] print time cost for streaming queries [flink]

2023-12-06 Thread via GitHub


JingGe commented on code in PR #23868:
URL: https://github.com/apache/flink/pull/23868#discussion_r1417918470


##
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableauResultView.java:
##
@@ -114,6 +108,30 @@ public void displayResults() throws SqlExecutionException {
 }
 }
 
+private void printTerminatedFooter(AtomicInteger receivedRowCount) {

Review Comment:
   Thanks for the hint. Checked Trino that the time consumption will be printed 
after users cancelled 
https://github.com/trinodb/trino/blob/ae7849e5b81b0c0eff7e980222502bf392a9a7f3/client/trino-cli/src/main/java/io/trino/cli/StatusPrinter.java#L121
   
https://github.com/trinodb/trino/blob/ae7849e5b81b0c0eff7e980222502bf392a9a7f3/client/trino-cli/src/main/java/io/trino/cli/StatusPrinter.java#L350



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33708][FLINK-33709] Introduce TraceReporter and use it to create checkpointing traces [flink]

2023-12-06 Thread via GitHub


rkhachatryan commented on code in PR #23845:
URL: https://github.com/apache/flink/pull/23845#discussion_r1417895747


##
flink-metrics/flink-metrics-slf4j/src/main/java/org/apache/flink/traces/slf4j/Slf4jTraceReporter.java:
##
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.traces.slf4j;
+
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.traces.Span;
+import org.apache.flink.traces.reporter.TraceReporter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link TraceReporter} that exports {@link org.apache.flink.traces.Span 
Spans} via SLF4J {@link
+ * Logger}.
+ */
+public class Slf4jTraceReporter implements TraceReporter {
+private static final Logger LOG = 
LoggerFactory.getLogger(Slf4jTraceReporter.class);
+
+@Override
+public void notifyOfAddedSpan(Span span) {
+StringBuilder builder = new StringBuilder();
+builder.append("Reported span: ");
+builder.append(span.toString());
+LOG.info(builder.toString());

Review Comment:
   Why can't we just pass `builder` to `LOG`?
   ```
   LOG.info("Reported span: {}", builder);
   ```
   
   That should allow to eliminate `toString` call at runtime depending on the 
configuration.



##
docs/content/docs/deployment/config.md:
##
@@ -289,6 +289,15 @@ Enabling RocksDB's native metrics may cause degraded 
performance and should be s
 
 
 
+# Traces

Review Comment:
   NIT: This doesn't show up in TOC - it should be 2 or more `#` with the 
current settings.
   
   As well as other, existing section (e.g. `History Server` is missing).
   
   Settings can be fixed by adding to `docs/config.toml`:
   ```
   [markup.tableOfContents]
   startLevel = 1
   ```
   Not sure if this change would be reasonable and whether it can be added as a 
hotfix to this PR.



##
docs/content/docs/ops/traces.md:
##
@@ -0,0 +1,128 @@
+---
+title: "Traces"
+weight: 6
+type: docs
+aliases:
+  - /ops/traces.html
+  - /apis/traces.html
+  - /monitoring/traces.html
+---
+
+
+# Traces
+
+Flink exposes a tracing system that allows gathering and exposing traces to 
external systems.
+
+## Reporting traces
+
+You can access the tracing system from any user function that extends 
[RichFunction]({{< ref "docs/dev/datastream/user_defined_functions" 
>}}#rich-functions) by calling `getRuntimeContext().getMetricGroup()`.
+This method returns a `MetricGroup` object via which you can report a new 
single span trace.
+
+### Reporting single Span
+
+
+A `Span` represents something that happened in Flink at certain point of time, 
that will be reported to a `TraceReporter`.
+To report a `Span` you can use the `MetricGroup#addSpan(SpanBuilder)` method.
+
+Currently we don't support traces with multiple spans. Each `Span` is 
self-contained and represents things like a checkpoint or recovery.
+{{< tabs "9612d275-bdda-4322-a01f-ae6da805e917" >}}
+{{< tab "Java" >}}
+```java
+public class MyClass {
+void doSomething() {
+// (...)
+metricGroup.addSpan(
+Span.builder(MyClass.class, "SomeAction")
+.setStartTsMillis(startTs) // Optional
+.setEndTsMillis(endTs) // Optional
+.setAttribute("foo", "bar");
+}
+}
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+Currently reporting Spans from Python is not supported.
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+## Reporter
+
+For information on how to set up Flink's trace reporters please take a look at 
the [trace reporters documentation]({{< ref "docs/deployment/trace_reporters" 
>}}).
+
+## System traces
+
+Flink reports traces listed below.
+
+The tables below generally feature 5 columns:
+
+* The "Scope" column describes what is that trace reported scope.
+
+* The "Name" column describes the name of the reported trace.
+
+* The "Attributes" column lists the names of all attributes that are reported 
with the given trace.
+
+* The "Description" column provides information as to what a given attribute 
is reporting.
+
+### Checkpointing
+
+Flink is reporting a single span trace for the whole checkpoint once 
checkpoi

Re: [PR] [FLINK-33708][FLINK-33709] Introduce TraceReporter and use it to create checkpointing traces [flink]

2023-12-06 Thread via GitHub


rkhachatryan commented on code in PR #23845:
URL: https://github.com/apache/flink/pull/23845#discussion_r1417918412


##
docs/content/docs/deployment/config.md:
##
@@ -289,6 +289,15 @@ Enabling RocksDB's native metrics may cause degraded 
performance and should be s
 
 
 
+# Traces

Review Comment:
   NIT: This doesn't show up in TOC - it should be 2 or more `#` with the 
current settings.
   
   As well as other, existing sections (e.g. `History Server` is missing).
   
   Settings can be fixed by adding to `docs/config.toml`:
   ```
   [markup.tableOfContents]
   startLevel = 1
   ```
   Not sure if this change would be reasonable and whether it can be added as a 
hotfix to this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-33676] Implement RestoreTests for WindowAggregate [flink]

2023-12-06 Thread via GitHub


jnh5y opened a new pull request, #23886:
URL: https://github.com/apache/flink/pull/23886

   ## What is the purpose of the change
   
   Implement restore tests for WindowAggregate node.
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   * Added restore tests for WindowAggregate node which verifies the generated 
compiled plan with the saved compiled plan
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33676) Implement restore tests for WindowAggregate node

2023-12-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33676:
---
Labels: pull-request-available  (was: )

> Implement restore tests for WindowAggregate node
> 
>
> Key: FLINK-33676
> URL: https://issues.apache.org/jira/browse/FLINK-33676
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Jim Hughes
>Assignee: Jim Hughes
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >