[jira] [Created] (FLINK-34007) Flink Job stuck in suspend state after recovery from failure in HA Mode

2024-01-05 Thread Zhenqiu Huang (Jira)
Zhenqiu Huang created FLINK-34007:
-

 Summary: Flink Job stuck in suspend state after recovery from 
failure in HA Mode
 Key: FLINK-34007
 URL: https://issues.apache.org/jira/browse/FLINK-34007
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.18.1, 1.18.2
Reporter: Zhenqiu Huang


The observation is that Job manager goes to suspend state with a failed 
container not able to register itself to resource manager after timeout.

JM Log:

2024-01-04 02:58:39,210 INFO  
org.apache.flink.runtime.jobmaster.JobMasterServiceLeadershipRunner [] - 
JobMasterServiceLeadershipRunner for job 217cee964b2cfdc3115fb74cac0ec550 was 
revoked leadership with leader id eda6fee6-ce02-4076-9a99-8c43a92629f7. 
Stopping current JobMasterServiceProcess.
2024-01-04 02:58:58,347 INFO  
org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - 
http://172.16.71.11:8081 lost leadership
2024-01-04 02:58:58,347 INFO  
org.apache.flink.runtime.resourcemanager.ResourceManagerServiceImpl [] - 
Resource manager service is revoked leadership with session id 
eda6fee6-ce02-4076-9a99-8c43a92629f7.
2024-01-04 02:58:58,348 INFO  
org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunner [] - 
DefaultDispatcherRunner was revoked the leadership with leader id 
eda6fee6-ce02-4076-9a99-8c43a92629f7. Stopping the DispatcherLeaderProcess.
2024-01-04 02:58:58,348 INFO  
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess [] - 
Stopping SessionDispatcherLeaderProcess.
2024-01-04 02:58:58,349 INFO  
org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Stopping 
dispatcher pekko.tcp://flink@172.16.71.11:6123/user/rpc/dispatcher_1.
2024-01-04 02:58:58,349 INFO  org.apache.flink.runtime.jobmaster.JobMaster  
   [] - Stopping the JobMaster for job 
'amp-ade-fitness-clickstream-projection-uat' (217cee964b2cfdc3115fb74cac0ec550).
2024-01-04 02:58:58,349 INFO  
org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Stopping all 
currently running jobs of dispatcher 
pekko.tcp://flink@172.16.71.11:6123/user/rpc/dispatcher_1.
2024-01-04 02:58:58,351 INFO  
org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job 
217cee964b2cfdc3115fb74cac0ec550 reached terminal state SUSPENDED.
2024-01-04 02:58:58,352 INFO  
org.apache.flink.runtime.security.token.DefaultDelegationTokenManager [] - 
Stopping credential renewal
2024-01-04 02:58:58,352 INFO  
org.apache.flink.runtime.security.token.DefaultDelegationTokenManager [] - 
Stopped credential renewal
2024-01-04 02:58:58,352 INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager [] 
- Closing the slot manager.
2024-01-04 02:58:58,351 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job 
amp-ade-fitness-clickstream-projection-uat (217cee964b2cfdc3115fb74cac0ec550) 
switched from state RUNNING to SUSPENDED.
org.apache.flink.util.FlinkException: AdaptiveScheduler is being stopped.
at 
org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.closeAsync(AdaptiveScheduler.java:474)
 ~[flink-dist-1.18.1.6-ase.jar:1.18.1.6-ase]
at 
org.apache.flink.runtime.jobmaster.JobMaster.stopScheduling(JobMaster.java:1093)
 ~[flink-dist-1.18.1.6-ase.jar:1.18.1.6-ase]
at 
org.apache.flink.runtime.jobmaster.JobMaster.stopJobExecution(JobMaster.java:1056)
 ~[flink-dist-1.18.1.6-ase.jar:1.18.1.6-ase]
at 
org.apache.flink.runtime.jobmaster.JobMaster.onStop(JobMaster.java:454) 
~[flink-dist-1.18.1.6-ase.jar:1.18.1.6-ase]
at 
org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:239)
 ~[flink-dist-1.18.1.6-ase.jar:1.18.1.6-ase]
at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor$StartedState.lambda$terminate$0(PekkoRpcActor.java:574)
 ~[flink-rpc-akkadb952114-fa83-4aba-b20a-b7e5771ce59c.jar:1.18.1.6-ase]
at 
org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
 ~[flink-dist-1.18.1.6-ase.jar:1.18.1.6-ase]
at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor$StartedState.terminate(PekkoRpcActor.java:573)
 ~[flink-rpc-akkadb952114-fa83-4aba-b20a-b7e5771ce59c.jar:1.18.1.6-ase]
at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleControlMessage(PekkoRpcActor.java:196)
 ~[flink-rpc-akkadb952114-fa83-4aba-b20a-b7e5771ce59c.jar:1.18.1.6-ase]
at 
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) 
[flink-rpc-akkadb952114-fa83-4aba-b20a-b7e5771ce59c.jar:1.18.1.6-ase]
at 
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) 
[flink-rpc-akkadb952114-fa83-4aba-b20a-b7e5771ce59c.jar:1.18.1.6-ase]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) 
[flink-rpc-akkadb952114-fa83-4aba-b20a-b7e5771ce59c.jar:1.18.1.6-ase]

Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-05 Thread via GitHub


snuyanzin commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1443547398


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/ParameterizedPredicate.java:
##
@@ -52,8 +69,26 @@ public void setPredicate(String predicate) {
 }
 
 public ParameterizedPredicate combine(String operator, 
ParameterizedPredicate that) {
+int paramIndex = String.format("(%s %s ", this.predicate, 
operator).length();
+if (!that.indexesOfPredicatePlaceHolders.isEmpty()) {
+paramIndex = paramIndex + 
that.indexesOfPredicatePlaceHolders.get(0);
+}
+
 this.predicate = String.format("(%s %s %s)", this.predicate, operator, 
that.predicate);
 this.parameters = ArrayUtils.addAll(this.parameters, that.parameters);
+
+for (int i = 0; i < this.indexesOfPredicatePlaceHolders.size(); i++) {
+// increment all the existing indexes to account for the new 
additional first begin
+// bracket
+this.indexesOfPredicatePlaceHolders.set(
+i, this.indexesOfPredicatePlaceHolders.get(i) + 1);
+}
+if (that.predicate.equals(
+
JdbcFilterPushdownPreparedStatementVisitor.PUSHDOWN_PREDICATE_PLACEHOLDER)
+|| (!that.indexesOfPredicatePlaceHolders.isEmpty())) {
+// add index if that is a placeholder or has a placeholder.
+this.indexesOfPredicatePlaceHolders.add(paramIndex);
+}

Review Comment:
   Also I tend to think that approach with indexes will not work because 
between finding indexes and building `resolvedConditions` there is an optimizer 
which could change SQL a bit
   example of query where such change happens
   ```sql
   "SELECT * FROM a left join d FOR SYSTEM_TIME AS OF a.proctime on ((d.age = 
50 AND d.type = 0) OR (d.type = 1 AND d.age =40))  and a.ip = d.ip"
   ```



##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/ParameterizedPredicate.java:
##
@@ -52,8 +69,26 @@ public void setPredicate(String predicate) {
 }
 
 public ParameterizedPredicate combine(String operator, 
ParameterizedPredicate that) {
+int paramIndex = String.format("(%s %s ", this.predicate, 
operator).length();
+if (!that.indexesOfPredicatePlaceHolders.isEmpty()) {
+paramIndex = paramIndex + 
that.indexesOfPredicatePlaceHolders.get(0);
+}
+
 this.predicate = String.format("(%s %s %s)", this.predicate, operator, 
that.predicate);
 this.parameters = ArrayUtils.addAll(this.parameters, that.parameters);
+
+for (int i = 0; i < this.indexesOfPredicatePlaceHolders.size(); i++) {
+// increment all the existing indexes to account for the new 
additional first begin
+// bracket
+this.indexesOfPredicatePlaceHolders.set(
+i, this.indexesOfPredicatePlaceHolders.get(i) + 1);
+}
+if (that.predicate.equals(
+
JdbcFilterPushdownPreparedStatementVisitor.PUSHDOWN_PREDICATE_PLACEHOLDER)
+|| (!that.indexesOfPredicatePlaceHolders.isEmpty())) {
+// add index if that is a placeholder or has a placeholder.
+this.indexesOfPredicatePlaceHolders.add(paramIndex);
+}

Review Comment:
   Also I tend to think that approach with indexes will not work because 
between finding indexes and building `resolvedConditions` there is an optimizer 
which could change SQL a bit
   example of query where such change happens
   ```sql
   SELECT * FROM a left join d FOR SYSTEM_TIME AS OF a.proctime on ((d.age = 50 
AND d.type = 0) OR (d.type = 1 AND d.age =40))  and a.ip = d.ip;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-05 Thread via GitHub


snuyanzin commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1443523454


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/ParameterizedPredicate.java:
##
@@ -52,8 +69,26 @@ public void setPredicate(String predicate) {
 }
 
 public ParameterizedPredicate combine(String operator, 
ParameterizedPredicate that) {
+int paramIndex = String.format("(%s %s ", this.predicate, 
operator).length();
+if (!that.indexesOfPredicatePlaceHolders.isEmpty()) {
+paramIndex = paramIndex + 
that.indexesOfPredicatePlaceHolders.get(0);
+}
+
 this.predicate = String.format("(%s %s %s)", this.predicate, operator, 
that.predicate);
 this.parameters = ArrayUtils.addAll(this.parameters, that.parameters);
+
+for (int i = 0; i < this.indexesOfPredicatePlaceHolders.size(); i++) {
+// increment all the existing indexes to account for the new 
additional first begin
+// bracket
+this.indexesOfPredicatePlaceHolders.set(
+i, this.indexesOfPredicatePlaceHolders.get(i) + 1);
+}
+if (that.predicate.equals(
+
JdbcFilterPushdownPreparedStatementVisitor.PUSHDOWN_PREDICATE_PLACEHOLDER)
+|| (!that.indexesOfPredicatePlaceHolders.isEmpty())) {
+// add index if that is a placeholder or has a placeholder.
+this.indexesOfPredicatePlaceHolders.add(paramIndex);
+}

Review Comment:
   This will not work for cases when constant on the left side e.g.
   ```sql
   @Test
   public void test123() {
   util.verifyExecPlan(
   "SELECT * FROM a left join d FOR SYSTEM_TIME AS OF 
a.proctime on (50 = d.age OR 1 = d.type)  and a.ip = d.ip");
   }
   ```
   this test will fail with 
   ```
   Caused by: java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
at java.util.ArrayList.rangeCheck(ArrayList.java:659)
at java.util.ArrayList.get(ArrayList.java:435)
at 
org.apache.flink.connector.jdbc.table.JdbcDynamicTableSource.getResolvedConditions(JdbcDynamicTableSource.java:157)
at 
org.apache.flink.connector.jdbc.table.JdbcDynamicTableSource.getLookupRuntimeProvider(JdbcDynamicTableSource.java:120)
at 
org.apache.flink.table.planner.plan.utils.LookupJoinUtil.createLookupRuntimeProvider(LookupJoinUtil.java:626)
at 
org.apache.flink.table.planner.plan.utils.LookupJoinUtil.isAsyncLookup(LookupJoinUtil.java:413)
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33817][flink-protobuf] Set ReadDefaultValues=False by default in proto3 for performance improvement [flink]

2024-01-05 Thread via GitHub


flinkbot commented on PR #24035:
URL: https://github.com/apache/flink/pull/24035#issuecomment-1879438867

   
   ## CI report:
   
   * a0672a71f647609a9a4790943070cc56f7da1d10 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33817) Allow ReadDefaultValues = False for non primitive types on Proto3

2024-01-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33817:
---
Labels: pull-request-available  (was: )

> Allow ReadDefaultValues = False for non primitive types on Proto3
> -
>
> Key: FLINK-33817
> URL: https://issues.apache.org/jira/browse/FLINK-33817
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.18.0
>Reporter: Sai Sharath Dandi
>Priority: Major
>  Labels: pull-request-available
>
> *Background*
>  
> The current Protobuf format 
> [implementation|https://github.com/apache/flink/blob/c3e2d163a637dca5f49522721109161bd7ebb723/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/ProtoToRowConverter.java]
>  always sets ReadDefaultValues=False when using Proto3 version. This can 
> cause severe performance degradation for large Protobuf schemas with OneOf 
> fields as the entire generated code needs to be executed during 
> deserialization even when certain fields are not present in the data to be 
> deserialized and all the subsequent nested Fields can be skipped. Proto3 
> supports hasXXX() methods for checking field presence for non primitive types 
> since Proto version 
> [3.15|https://github.com/protocolbuffers/protobuf/releases/tag/v3.15.0]. In 
> the internal performance benchmarks in our company, we've seen almost 10x 
> difference in performance for one of our real production usecase when 
> allowing to set ReadDefaultValues=False with proto3 version. The exact 
> difference in performance depends on the schema complexity and data payload 
> but we should allow user to set readDefaultValue=False in general.
>  
> *Solution*
>  
> Support using ReadDefaultValues=False when using Proto3 version. We need to 
> be careful to check for field presence only on non-primitive types if 
> ReadDefaultValues is false and version used is Proto3



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-33817][flink-protobuf] Set ReadDefaultValues=False by default in proto3 for performance improvement [flink]

2024-01-05 Thread via GitHub


sharath1709 opened a new pull request, #24035:
URL: https://github.com/apache/flink/pull/24035

   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-05 Thread via GitHub


snuyanzin commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1443527603


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java:
##
@@ -94,30 +97,76 @@ public JdbcDynamicTableSource(
 
 @Override
 public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext 
context) {
-// JDBC only support non-nested look up keys
+// JDBC only supports non-nested look up keys
 String[] keyNames = new String[context.getKeys().length];
+
 for (int i = 0; i < keyNames.length; i++) {
 int[] innerKeyArr = context.getKeys()[i];
 Preconditions.checkArgument(
 innerKeyArr.length == 1, "JDBC only support non-nested 
look up keys");
 keyNames[i] = 
DataType.getFieldNames(physicalRowDataType).get(innerKeyArr[0]);
 }
+
 final RowType rowType = (RowType) physicalRowDataType.getLogicalType();
+
 JdbcRowDataLookupFunction lookupFunction =
 new JdbcRowDataLookupFunction(
 options,
 lookupMaxRetryTimes,
 
DataType.getFieldNames(physicalRowDataType).toArray(new String[0]),
 
DataType.getFieldDataTypes(physicalRowDataType).toArray(new DataType[0]),
 keyNames,
-rowType);
+rowType,
+
getResolvedConditions(this.pushdownParameterizedPredicates));
 if (cache != null) {
 return PartialCachingLookupProvider.of(lookupFunction, cache);
 } else {
 return LookupFunctionProvider.of(lookupFunction);
 }
 }
 
+@VisibleForTesting
+protected String[] getResolvedConditions(List 
parameterizedPredicates) {
+String[] conditions = null;
+if (parameterizedPredicates != null) {
+conditions = new String[parameterizedPredicates.size()];
+
+for (int predicateIndex = 0;
+predicateIndex < parameterizedPredicates.size();
+predicateIndex++) {
+ParameterizedPredicate pushdownParameterizedPredicate =
+parameterizedPredicates.get(predicateIndex);
+String predicate = 
pushdownParameterizedPredicate.getPredicate();
+Serializable[] parameters = 
pushdownParameterizedPredicate.getParameters();
+ArrayList indexesOfPredicatePlaceHolders =
+
pushdownParameterizedPredicate.getIndexesOfPredicatePlaceHolders();
+// build up the resolved condition.
+StringBuilder resolvedCondition = new StringBuilder();
+if (parameters == null || parameters.length == 0) {
+// no parameter values to resolve (for example when there 
is a unary
+// operation)
+resolvedCondition.append(predicate);
+} else {
+int processingIndex = 0;
+for (int parameterIndex = 0;
+parameterIndex < parameters.length;
+parameterIndex++) {
+resolvedCondition.append(
+predicate.substring(
+processingIndex,
+
indexesOfPredicatePlaceHolders.get(parameterIndex)));

Review Comment:
   no need to do substring since `append` could append only a part of string 
based on provided boundaries e.g.
   ```java
   resolvedCondition.append(
   predicate, processingIndex, 
indexesOfPredicatePlaceHolders.get(parameterIndex));
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-05 Thread via GitHub


snuyanzin commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1443526792


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java:
##
@@ -94,30 +97,76 @@ public JdbcDynamicTableSource(
 
 @Override
 public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext 
context) {
-// JDBC only support non-nested look up keys
+// JDBC only supports non-nested look up keys
 String[] keyNames = new String[context.getKeys().length];
+
 for (int i = 0; i < keyNames.length; i++) {
 int[] innerKeyArr = context.getKeys()[i];
 Preconditions.checkArgument(
 innerKeyArr.length == 1, "JDBC only support non-nested 
look up keys");
 keyNames[i] = 
DataType.getFieldNames(physicalRowDataType).get(innerKeyArr[0]);
 }
+
 final RowType rowType = (RowType) physicalRowDataType.getLogicalType();
+
 JdbcRowDataLookupFunction lookupFunction =
 new JdbcRowDataLookupFunction(
 options,
 lookupMaxRetryTimes,
 
DataType.getFieldNames(physicalRowDataType).toArray(new String[0]),
 
DataType.getFieldDataTypes(physicalRowDataType).toArray(new DataType[0]),
 keyNames,
-rowType);
+rowType,
+
getResolvedConditions(this.pushdownParameterizedPredicates));
 if (cache != null) {
 return PartialCachingLookupProvider.of(lookupFunction, cache);
 } else {
 return LookupFunctionProvider.of(lookupFunction);
 }
 }
 
+@VisibleForTesting
+protected String[] getResolvedConditions(List 
parameterizedPredicates) {
+String[] conditions = null;
+if (parameterizedPredicates != null) {
+conditions = new String[parameterizedPredicates.size()];
+
+for (int predicateIndex = 0;
+predicateIndex < parameterizedPredicates.size();
+predicateIndex++) {
+ParameterizedPredicate pushdownParameterizedPredicate =
+parameterizedPredicates.get(predicateIndex);
+String predicate = 
pushdownParameterizedPredicate.getPredicate();
+Serializable[] parameters = 
pushdownParameterizedPredicate.getParameters();
+ArrayList indexesOfPredicatePlaceHolders =
+
pushdownParameterizedPredicate.getIndexesOfPredicatePlaceHolders();
+// build up the resolved condition.
+StringBuilder resolvedCondition = new StringBuilder();
+if (parameters == null || parameters.length == 0) {
+// no parameter values to resolve (for example when there 
is a unary
+// operation)
+resolvedCondition.append(predicate);
+} else {
+int processingIndex = 0;
+for (int parameterIndex = 0;
+parameterIndex < parameters.length;
+parameterIndex++) {
+resolvedCondition.append(
+predicate.substring(
+processingIndex,
+
indexesOfPredicatePlaceHolders.get(parameterIndex)));
+resolvedCondition.append(parameters[parameterIndex]);
+processingIndex = 
indexesOfPredicatePlaceHolders.get(parameterIndex) + 1;
+}
+resolvedCondition.append(
+predicate.substring(processingIndex, 
predicate.length()));

Review Comment:
   ```suggestion
   resolvedCondition.append(
   predicate.substring(processingIndex));
   ```
   no need to specify last index if it is end of string



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-05 Thread via GitHub


snuyanzin commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1443521093


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitor.java:
##
@@ -97,12 +147,14 @@ private Optional 
renderBinaryOperator(
 Optional leftOperandString = 
allOperands.get(0).accept(this);
 
 Optional rightOperandString = 
allOperands.get(1).accept(this);
-
-return leftOperandString.flatMap(
-left -> rightOperandString.map(right -> left.combine(operator, 
right)));
+Optional renderedParameterizedPredicate =
+leftOperandString.flatMap(
+left -> rightOperandString.map(right -> 
left.combine(operator, right)));
+return renderedParameterizedPredicate;
 }
 
-private Optional renderUnaryOperator(
+@VisibleForTesting
+protected Optional renderUnaryOperator(

Review Comment:
   Why is it marked as `VisibleForTesting` if it is not used in any test?



##
flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactoryTest.java:
##
@@ -390,7 +390,7 @@ void testJdbcLookupPropertiesWithExcludeEmptyResult() {
 assertThat(actual).isEqualTo(expected);
 }
 
-private Map getAllOptions() {
+protected static Map getAllOptions() {

Review Comment:
   why do we need this?



##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/ParameterizedPredicate.java:
##
@@ -23,18 +23,35 @@
 import org.apache.commons.lang3.ArrayUtils;
 
 import java.io.Serializable;
+import java.util.ArrayList;
 
 /** A data class that model parameterized sql predicate. */
 @Experimental
 public class ParameterizedPredicate {
 private String predicate;
 private Serializable[] parameters;
+private ArrayList indexesOfPredicatePlaceHolders = new 
ArrayList<>();

Review Comment:
   ```suggestion
   private List indexesOfPredicatePlaceHolders = new ArrayList<>();
   ```
   It is recommened to use interfaces rather than concrete implementation



##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/ParameterizedPredicate.java:
##
@@ -52,8 +69,26 @@ public void setPredicate(String predicate) {
 }
 
 public ParameterizedPredicate combine(String operator, 
ParameterizedPredicate that) {
+int paramIndex = String.format("(%s %s ", this.predicate, 
operator).length();
+if (!that.indexesOfPredicatePlaceHolders.isEmpty()) {
+paramIndex = paramIndex + 
that.indexesOfPredicatePlaceHolders.get(0);
+}
+
 this.predicate = String.format("(%s %s %s)", this.predicate, operator, 
that.predicate);
 this.parameters = ArrayUtils.addAll(this.parameters, that.parameters);
+
+for (int i = 0; i < this.indexesOfPredicatePlaceHolders.size(); i++) {
+// increment all the existing indexes to account for the new 
additional first begin
+// bracket
+this.indexesOfPredicatePlaceHolders.set(
+i, this.indexesOfPredicatePlaceHolders.get(i) + 1);
+}
+if (that.predicate.equals(
+
JdbcFilterPushdownPreparedStatementVisitor.PUSHDOWN_PREDICATE_PLACEHOLDER)
+|| (!that.indexesOfPredicatePlaceHolders.isEmpty())) {
+// add index if that is a placeholder or has a placeholder.
+this.indexesOfPredicatePlaceHolders.add(paramIndex);
+}

Review Comment:
   This will not work for cases when constant on the left side e.g.
   ```sql
   @Test
   public void test123() {
   util.verifyExecPlan(
   "SELECT * FROM a left join d FOR SYSTEM_TIME AS OF 
a.proctime on (50 = d.age OR 1 = d.type)  and a.ip = d.ip");
   }
   ```
   this test will fail with 
   ```
   Caused by: java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
at java.util.ArrayList.rangeCheck(ArrayList.java:659)
at java.util.ArrayList.get(ArrayList.java:435)
at 
org.apache.flink.connector.jdbc.table.JdbcDynamicTableSource.getResolvedConditions(JdbcDynamicTableSource.java:157)
at 
org.apache.flink.connector.jdbc.table.JdbcDynamicTableSource.getLookupRuntimeProvider(JdbcDynamicTableSource.java:120)
at 
org.apache.flink.table.planner.plan.utils.LookupJoinUtil.createLookupRuntimeProvider(LookupJoinUtil.java:626)
at 
org.apache.flink.table.planner.plan.utils.LookupJoinUtil.isAsyncLookup(LookupJoinUtil.java:413)
   ```
   
   The method itself makes some things which could be omitted, I would suggest 
to replace it with 
   ```suggestion
   this.predicate = String.format("(%s %s %s)", this.predicate, 
operator, that.predicate);
   this.parameters = ArrayUtils.addAll(this.parameters, 
that.parameters);
   
   

[jira] [Created] (FLINK-34006) Flink terminates the execution of an application when there is a network problem between TaskManagers

2024-01-05 Thread Sophia (Jira)
Sophia created FLINK-34006:
--

 Summary: Flink terminates the execution of an application when 
there is a network problem between TaskManagers
 Key: FLINK-34006
 URL: https://issues.apache.org/jira/browse/FLINK-34006
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing, Runtime / Task
Affects Versions: 1.17.1
Reporter: Sophia


Flink terminates an application when two TaskManager are disconnected although 
there are enough resources in the cluster to run the application and we use 
checkpoint restart.

 

We deploy Flink v(1.17.1) on a cluster of six nodes with Ubuntu 18.04, the 
cluster consists of a JobManager and five TaskManagers. We use Flink's 
Standalone resource manager. We set the number of slots per TaskManager to one, 
and submit a WordCount application with a level of parallelism equal to three. 
We enable Flink checkpointing and restart failover strategy to attempt a 
restart in case of failure three times before termination and the time between 
attempts to 10 seconds.

The application starts running on the first 3 TaskManager.

If the communication is broken between two of the TaskManager that run the 
application, the job fails, and the JobManager tries to restart the job again. 
When the job fails the resources on the TaskManager are free. When the 
JobManager restarts the job, it selects the same three TaskManager it choose in 
the first attempt, and the job fails again. After three trials, Flink 
terminates the job with an exception: Connecting to remote task manager has 
failed.

 

These are the JobManager Configurations:
 * taskmanager.numberOfTaskSlots: 1
 * Enable checkpointing: --checkpointing
 * execution.checkpointing.interval: 3min
 * Enabling restart failover strategy
 * restart-strategy.type: fixed-delay
 * restart-strategy.fixed-delay.attempts: 3
 * restart-strategy.fixed-delay.delay: 10 s

 

command: ./bin/flink run -p 3 examples/streaming/WordCount.jar --checkpointing 
--input ~/flink/alice.txt



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-31238] [WIP] Use IngestDB to speed up Rocksdb rescaling recovery (rebased) [flink]

2024-01-05 Thread via GitHub


rkhachatryan commented on code in PR #24031:
URL: https://github.com/apache/flink/pull/24031#discussion_r1443459209


##
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java:
##
@@ -105,6 +110,10 @@ public class RocksDBIncrementalRestoreOperation 
implements RocksDBRestoreOper
 
 private boolean isKeySerializerCompatibilityChecked;
 
+private final TriConsumerWithException<
+Collection, byte[], 
byte[], Exception>
+rescalingRestoreFromLocalStateOperation;
+

Review Comment:
   This can be replaced by a boolean flag (`useIngestDbRestoreMode`) and a 
"normal" `if` and call in `restoreWithRescaling`. 
   I think it would be more readable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31238] [WIP] Use IngestDB to speed up Rocksdb rescaling recovery (rebased) [flink]

2024-01-05 Thread via GitHub


rkhachatryan commented on code in PR #24031:
URL: https://github.com/apache/flink/pull/24031#discussion_r1443422130


##
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksIncrementalCheckpointRescalingTest.java:
##
@@ -419,7 +431,12 @@ private KeyedOneInputStreamOperatorTestHarness getHarne
 }
 
 private StateBackend getStateBackend() throws Exception {
-return new RocksDBStateBackend("file://" + 
rootFolder.newFolder().getAbsolutePath(), true);
+RocksDBStateBackend rocksDBStateBackend =
+new RocksDBStateBackend("file://" + 
rootFolder.newFolder().getAbsolutePath(), true);
+Configuration configuration = new Configuration();

Review Comment:
   Missing import for `Configuration`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31238] [WIP] Use IngestDB to speed up Rocksdb rescaling recovery (rebased) [flink]

2024-01-05 Thread via GitHub


rkhachatryan commented on code in PR #24031:
URL: https://github.com/apache/flink/pull/24031#discussion_r1443421426


##
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java:
##
@@ -154,6 +162,54 @@ private static void deleteRange(
 }
 }
 
+/**
+ * Clip the entries in the CF according to the range [begin_key, end_key). 
Any entries outside
+ * this range will be completely deleted (including tombstones).
+ *
+ * @param db the target need to be clipped.
+ * @param columnFamilyHandles the column family need to be clipped.
+ * @param beginKeyBytes the begin key bytes
+ * @param endKeyBytes the end key bytes
+ */
+public static void clipColumnFamilies(
+RocksDB db,
+List columnFamilyHandles,
+byte[] beginKeyBytes,
+byte[] endKeyBytes)
+throws RocksDBException {
+
+for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) {
+db.clipColumnFamily(columnFamilyHandle, beginKeyBytes, 
endKeyBytes);
+}
+}
+
+public static List>
+exportColumnFamilies(
+RocksDB db,
+List columnFamilyHandles,
+List stateMetaInfoSnapshots,

Review Comment:
   These two lists should match each other, right?
   Should we check with `Preconditions.checkState(columnFamilyHandles.size() == 
stateMetaInfoSnapshots.size());`,
   or better pass RocksDB instance here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31238] [WIP] Use IngestDB to speed up Rocksdb rescaling recovery (rebased) [flink]

2024-01-05 Thread via GitHub


rkhachatryan commented on code in PR #24031:
URL: https://github.com/apache/flink/pull/24031#discussion_r1443420860


##
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java:
##
@@ -305,6 +311,11 @@ private EmbeddedRocksDBStateBackend(
 overlapFractionThreshold >= 0 && this.overlapFractionThreshold 
<= 1,
 "Overlap fraction threshold of restoring should be between 0 
and 1");
 
+useIngestDbRestoreMode =
+original.useIngestDbRestoreMode == 
UNDEFINED_USE_INGEST_DB_RESTORE_MODE
+? config.get(USE_INGEST_DB_RESTORE_MODE)
+: original.useIngestDbRestoreMode;

Review Comment:
   Won't this ignore `original.useIngestDbRestoreMode` if it's set to `false`?
   (I guess this is a copy-paste from other fields, but they aren't boolean)



##
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java:
##
@@ -329,74 +332,164 @@ private void 
restoreWithRescaling(Collection restoreStateHandl
 CompositeKeySerializationUtils.serializeKeyGroup(
 keyGroupRange.getEndKeyGroup() + 1, 
stopKeyGroupPrefixBytes);
 
-// Insert all remaining state through creating temporary RocksDB 
instances
+rescalingRestoreFromLocalStateOperation.accept(
+localKeyedStateHandles, startKeyGroupPrefixBytes, 
stopKeyGroupPrefixBytes);
+} finally {
+// Cleanup all download directories
+allDownloadSpecs.stream()
+.map(StateHandleDownloadSpec::getDownloadDestination)
+.forEach(this::cleanUpPathQuietly);
+}
+}
+
+private void rescaleCopyFromTemporaryInstance(
+Collection 
localKeyedStateHandles,
+byte[] startKeyGroupPrefixBytes,
+byte[] stopKeyGroupPrefixBytes)
+throws Exception {
+
+// Choose the best state handle for the initial DB
+final IncrementalLocalKeyedStateHandle selectedInitialHandle =
+
RocksDBIncrementalCheckpointUtils.chooseTheBestStateHandleForInitial(
+localKeyedStateHandles, keyGroupRange, 
overlapFractionThreshold);
+
+Preconditions.checkNotNull(selectedInitialHandle);
+
+// Remove the selected handle from the list so that we don't restore 
it twice.
+localKeyedStateHandles.remove(selectedInitialHandle);
+
+// Init the base DB instance with the initial state
+initBaseDBForRescaling(selectedInitialHandle);
+
+for (IncrementalLocalKeyedStateHandle stateHandle : 
localKeyedStateHandles) {
+logger.info("Starting to restore from state handle: {} with 
rescaling.", stateHandle);
+
+try (RestoredDBInstance tmpRestoreDBInfo =
+restoreTempDBInstanceFromLocalState(stateHandle);
+RocksDBWriteBatchWrapper writeBatchWrapper =
+new RocksDBWriteBatchWrapper(
+this.rocksHandle.getDb(), writeBatchSize)) 
{
+
+List tmpColumnFamilyDescriptors =
+tmpRestoreDBInfo.columnFamilyDescriptors;
+List tmpColumnFamilyHandles =
+tmpRestoreDBInfo.columnFamilyHandles;
+
+// iterating only the requested descriptors automatically 
skips the default
+// column
+// family handle
+for (int descIdx = 0; descIdx < 
tmpColumnFamilyDescriptors.size(); ++descIdx) {
+ColumnFamilyHandle tmpColumnFamilyHandle = 
tmpColumnFamilyHandles.get(descIdx);
+
+ColumnFamilyHandle targetColumnFamilyHandle =
+
this.rocksHandle.getOrRegisterStateColumnFamilyHandle(
+null,
+
tmpRestoreDBInfo.stateMetaInfoSnapshots.get(descIdx))
+.columnFamilyHandle;
+
+try (RocksIteratorWrapper iterator =
+RocksDBOperationUtils.getRocksIterator(
+tmpRestoreDBInfo.db,
+tmpColumnFamilyHandle,
+tmpRestoreDBInfo.readOptions)) {
+
+iterator.seek(startKeyGroupPrefixBytes);
+
+while (iterator.isValid()) {
+
+if 
(RocksDBIncrementalCheckpointUtils.beforeThePrefixBytes(
+iterator.key(), stopKeyGroupPrefixBytes)) {
+writeBatchWrapper.put(
+targetColumnFamilyHandle, 
iterator.key(), iterator.value());
+   

Re: [PR] [FLINK-33996][table-planner]: Avoid merging projects if leads to redundant computation [flink]

2024-01-05 Thread via GitHub


jeyhunkarimov commented on PR #24033:
URL: https://github.com/apache/flink/pull/24033#issuecomment-1879333784

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33611] [flink-protobuf] Support Large Protobuf Schemas [flink]

2024-01-05 Thread via GitHub


sharath1709 commented on PR #23937:
URL: https://github.com/apache/flink/pull/23937#issuecomment-1879310386

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33996][table-planner]: Avoid merging projects if leads to redundant computation [flink]

2024-01-05 Thread via GitHub


jeyhunkarimov commented on PR #24033:
URL: https://github.com/apache/flink/pull/24033#issuecomment-1879308733

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34005] Implement restore tests for MiniBatchAssigner node [flink]

2024-01-05 Thread via GitHub


flinkbot commented on PR #24034:
URL: https://github.com/apache/flink/pull/24034#issuecomment-1879185905

   
   ## CI report:
   
   * a671536a73ceee8b2fb8ec1cef69f342f979cd1e UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34005) Implement restore tests for MiniBatchAssigner node

2024-01-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-34005:
---
Labels: pull-request-available  (was: )

> Implement restore tests for MiniBatchAssigner node
> --
>
> Key: FLINK-34005
> URL: https://issues.apache.org/jira/browse/FLINK-34005
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Bonnie Varghese
>Assignee: Bonnie Varghese
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-34005] Implement restore tests for MiniBatchAssigner node [flink]

2024-01-05 Thread via GitHub


bvarghese1 opened a new pull request, #24034:
URL: https://github.com/apache/flink/pull/24034

   
   
   ## What is the purpose of the change
   
   *Add restore tests for MiniBatchAssigner node*
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   - Added restore tests for MiniBatchAssigner node which verifies the 
generated compiled plan with the saved compiled plan.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-34005) Implement restore tests for MiniBatchAssigner node

2024-01-05 Thread Bonnie Varghese (Jira)
Bonnie Varghese created FLINK-34005:
---

 Summary: Implement restore tests for MiniBatchAssigner node
 Key: FLINK-34005
 URL: https://issues.apache.org/jira/browse/FLINK-34005
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Bonnie Varghese
Assignee: Bonnie Varghese






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [hotfix] Update NOTICE files to reflect year 2023 [flink-connector-hive]

2024-01-05 Thread via GitHub


snuyanzin commented on PR #4:
URL: 
https://github.com/apache/flink-connector-hive/pull/4#issuecomment-1879126027

   superceded by https://github.com/apache/flink-connector-hive/pull/6


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix] Update NOTICE files to reflect year 2023 [flink-connector-hive]

2024-01-05 Thread via GitHub


snuyanzin closed pull request #4: [hotfix] Update NOTICE files to reflect year 
2023
URL: https://github.com/apache/flink-connector-hive/pull/4


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34002] [FLINK-33883][jdbc] Bump CI flink version on flink-connector-elasticsearch to support Flink 1.19 [flink-connector-elasticsearch]

2024-01-05 Thread via GitHub


snuyanzin commented on PR #86:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/86#issuecomment-1879122467

   I submitted a PR to your branch to make it passing tests with jdk17+
   https://github.com/MartijnVisser/flink-connector-elasticsearch/pull/3


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-33925) Extended failure handling for bulk requests (elasticsearch back port)

2024-01-05 Thread Peter Schulz (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803629#comment-17803629
 ] 

Peter Schulz edited comment on FLINK-33925 at 1/5/24 5:13 PM:
--

[~martijnvisser], this PR seems stuck. I'm not sure how to interpret the failed 
CI pipeline. It complains about some ITs violating architecture rules:

{code}
ITCASE tests should use a MiniCluster resource or extension' was violated (3 
times):
org.apache.flink.connector.opensearch.sink.OpensearchSinkITCase does not 
satisfy…
org.apache.flink.connector.opensearch.sink.OpensearchWriterITCase does not 
satisfy…
org.apache.flink.connector.opensearch.table.OpensearchDynamicSinkITCase does 
not satisfy…
{code}

Except for {{OpensearchWriterITCase}} my PR did not touch those files. How can 
we move this forward?


was (Author: schulzp):
[~martijnvisser], this PR seems stuck. I'm not sure how to interpret the failed 
CI pipeline. It complains about some ITs violating architecture rules:

{code}
ITCASE tests should use a MiniCluster resource or extension' was violated (3 
times):
org.apache.flink.connector.opensearch.sink.OpensearchSinkITCase does not 
satisfy…
org.apache.flink.connector.opensearch.sink.OpensearchWriterITCase does not 
satisfy…
org.apache.flink.connector.opensearch.table.OpensearchDynamicSinkITCase does 
not satisfy…
{code}

Except for `OpensearchWriterITCase` my PR did not touch those files. How can we 
move this forward?

> Extended failure handling for bulk requests (elasticsearch back port)
> -
>
> Key: FLINK-33925
> URL: https://issues.apache.org/jira/browse/FLINK-33925
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Opensearch
>Affects Versions: opensearch-1.0.1
>Reporter: Peter Schulz
>Assignee: Peter Schulz
>Priority: Major
>  Labels: pull-request-available
>
> This is a back port of the implementation for the elasticsearch connector, 
> see FLINK-32028, to achieve consistent APIs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-33925) Extended failure handling for bulk requests (elasticsearch back port)

2024-01-05 Thread Peter Schulz (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803629#comment-17803629
 ] 

Peter Schulz edited comment on FLINK-33925 at 1/5/24 5:12 PM:
--

[~martijnvisser], this PR seems stuck. I'm not sure how to interpret the failed 
CI pipeline. It complains about some ITs violating architecture rules:

{code}
ITCASE tests should use a MiniCluster resource or extension' was violated (3 
times):
org.apache.flink.connector.opensearch.sink.OpensearchSinkITCase does not 
satisfy…
org.apache.flink.connector.opensearch.sink.OpensearchWriterITCase does not 
satisfy…
org.apache.flink.connector.opensearch.table.OpensearchDynamicSinkITCase does 
not satisfy…
{code}

Except for `OpensearchWriterITCase` my PR did not touch those files. How can we 
move this forward?


was (Author: schulzp):
[~martijnvisser], this PR seems stuck. I'm not sure how to interpret the failed 
CI pipeline. It complains about some ITs violating architecture rules:

```
ITCASE tests should use a MiniCluster resource or extension' was violated (3 
times):
org.apache.flink.connector.opensearch.sink.OpensearchSinkITCase does not 
satisfy…
org.apache.flink.connector.opensearch.sink.OpensearchWriterITCase does not 
satisfy…
org.apache.flink.connector.opensearch.table.OpensearchDynamicSinkITCase does 
not satisfy…
```

Except for `OpensearchWriterITCase` my PR did not touch those files. How can we 
move this forward?

> Extended failure handling for bulk requests (elasticsearch back port)
> -
>
> Key: FLINK-33925
> URL: https://issues.apache.org/jira/browse/FLINK-33925
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Opensearch
>Affects Versions: opensearch-1.0.1
>Reporter: Peter Schulz
>Assignee: Peter Schulz
>Priority: Major
>  Labels: pull-request-available
>
> This is a back port of the implementation for the elasticsearch connector, 
> see FLINK-32028, to achieve consistent APIs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33925) Extended failure handling for bulk requests (elasticsearch back port)

2024-01-05 Thread Peter Schulz (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803629#comment-17803629
 ] 

Peter Schulz commented on FLINK-33925:
--

[~martijnvisser], this PR seems stuck. I'm not sure how to interpret the failed 
CI pipeline. It complains about some ITs violating architecture rules:

```
ITCASE tests should use a MiniCluster resource or extension' was violated (3 
times):
org.apache.flink.connector.opensearch.sink.OpensearchSinkITCase does not 
satisfy…
org.apache.flink.connector.opensearch.sink.OpensearchWriterITCase does not 
satisfy…
org.apache.flink.connector.opensearch.table.OpensearchDynamicSinkITCase does 
not satisfy…
```

Except for `OpensearchWriterITCase` my PR did not touch those files. How can we 
move this forward?

> Extended failure handling for bulk requests (elasticsearch back port)
> -
>
> Key: FLINK-33925
> URL: https://issues.apache.org/jira/browse/FLINK-33925
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Opensearch
>Affects Versions: opensearch-1.0.1
>Reporter: Peter Schulz
>Assignee: Peter Schulz
>Priority: Major
>  Labels: pull-request-available
>
> This is a back port of the implementation for the elasticsearch connector, 
> see FLINK-32028, to achieve consistent APIs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33996) Support disabling project rewrite when multiple exprs in the project reference the same sub project field.

2024-01-05 Thread Jeyhun Karimov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803626#comment-17803626
 ] 

Jeyhun Karimov commented on FLINK-33996:


Thanks for reporting the issue [~hackergin] and thanks for your comment 
[~xuyangzhong]. I mostly agree with [~xuyangzhong]. An alternative (or simpler) 
solution would be to add an extra check in {{FlinkRelUtil::mergeable}} 
function, since all projection/calc merging rules use the result of this 
function before merging.

[~hackergin] IMO adding a new rule would be tricky since

- We will need to ingest the new rule to multiple places in the optimization 
process (codebase), because 1) project and calc merging rules are used several 
times and 2) the order of the new optimization rule will matter
- If we add a new rule, and if the two project/calc nodes already merged (by 
the existing merging rules), it will be non-trivial for our new rule to 
rollback (seperate the merged project/calc nodes)

What do you guys think? Also, please feel free to check/review the PR.

> Support disabling project rewrite when multiple exprs in the project 
> reference the same sub project field.
> --
>
> Key: FLINK-33996
> URL: https://issues.apache.org/jira/browse/FLINK-33996
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.18.0
>Reporter: Feng Jin
>Priority: Major
>  Labels: pull-request-available
>
> When multiple top projects reference the same bottom project, project rewrite 
> rules may result in complex projects being calculated multiple times.
> Take the following SQL as an example:
> {code:sql}
> create table test_source(a varchar) with ('connector'='datagen');
> explan plan for select a || 'a' as a, a || 'b' as b FROM (select 
> REGEXP_REPLACE(a, 'aaa', 'bbb') as a FROM test_source);
> {code}
> The final SQL plan is as follows:
> {code:sql}
> == Abstract Syntax Tree ==
> LogicalProject(a=[||($0, _UTF-16LE'a')], b=[||($0, _UTF-16LE'b')])
> +- LogicalProject(a=[REGEXP_REPLACE($0, _UTF-16LE'aaa', _UTF-16LE'bbb')])
>+- LogicalTableScan(table=[[default_catalog, default_database, 
> test_source]])
> == Optimized Physical Plan ==
> Calc(select=[||(REGEXP_REPLACE(a, _UTF-16LE'aaa', _UTF-16LE'bbb'), 
> _UTF-16LE'a') AS a, ||(REGEXP_REPLACE(a, _UTF-16LE'aaa', _UTF-16LE'bbb'), 
> _UTF-16LE'b') AS b])
> +- TableSourceScan(table=[[default_catalog, default_database, test_source]], 
> fields=[a])
> == Optimized Execution Plan ==
> Calc(select=[||(REGEXP_REPLACE(a, 'aaa', 'bbb'), 'a') AS a, 
> ||(REGEXP_REPLACE(a, 'aaa', 'bbb'), 'b') AS b])
> +- TableSourceScan(table=[[default_catalog, default_database, test_source]], 
> fields=[a])
> {code}
> It can be observed that after project write, regex_place is calculated twice. 
> Generally speaking, regular expression matching is a time-consuming operation 
> and we usually do not want it to be calculated multiple times. Therefore, for 
> this scenario, we can support disabling project rewrite.
> After disabling some rules, the final plan we obtained is as follows:
> {code:sql}
> == Abstract Syntax Tree ==
> LogicalProject(a=[||($0, _UTF-16LE'a')], b=[||($0, _UTF-16LE'b')])
> +- LogicalProject(a=[REGEXP_REPLACE($0, _UTF-16LE'aaa', _UTF-16LE'bbb')])
>+- LogicalTableScan(table=[[default_catalog, default_database, 
> test_source]])
> == Optimized Physical Plan ==
> Calc(select=[||(a, _UTF-16LE'a') AS a, ||(a, _UTF-16LE'b') AS b])
> +- Calc(select=[REGEXP_REPLACE(a, _UTF-16LE'aaa', _UTF-16LE'bbb') AS a])
>+- TableSourceScan(table=[[default_catalog, default_database, 
> test_source]], fields=[a])
> == Optimized Execution Plan ==
> Calc(select=[||(a, 'a') AS a, ||(a, 'b') AS b])
> +- Calc(select=[REGEXP_REPLACE(a, 'aaa', 'bbb') AS a])
>+- TableSourceScan(table=[[default_catalog, default_database, 
> test_source]], fields=[a])
> {code}
> After testing, we probably need to modify these few rules:
> org.apache.flink.table.planner.plan.rules.logical.FlinkProjectMergeRule
> org.apache.flink.table.planner.plan.rules.logical.FlinkCalcMergeRule
> org.apache.flink.table.planner.plan.rules.logical.FlinkProjectCalcMergeRule



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33996][table-planner]: Avoid merging projects if leads to redundant computation [flink]

2024-01-05 Thread via GitHub


flinkbot commented on PR #24033:
URL: https://github.com/apache/flink/pull/24033#issuecomment-1878973201

   
   ## CI report:
   
   * d687103dabf6ac2a0af763e8a3b845ef9629c7ba UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33996) Support disabling project rewrite when multiple exprs in the project reference the same sub project field.

2024-01-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33996?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33996:
---
Labels: pull-request-available  (was: )

> Support disabling project rewrite when multiple exprs in the project 
> reference the same sub project field.
> --
>
> Key: FLINK-33996
> URL: https://issues.apache.org/jira/browse/FLINK-33996
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.18.0
>Reporter: Feng Jin
>Priority: Major
>  Labels: pull-request-available
>
> When multiple top projects reference the same bottom project, project rewrite 
> rules may result in complex projects being calculated multiple times.
> Take the following SQL as an example:
> {code:sql}
> create table test_source(a varchar) with ('connector'='datagen');
> explan plan for select a || 'a' as a, a || 'b' as b FROM (select 
> REGEXP_REPLACE(a, 'aaa', 'bbb') as a FROM test_source);
> {code}
> The final SQL plan is as follows:
> {code:sql}
> == Abstract Syntax Tree ==
> LogicalProject(a=[||($0, _UTF-16LE'a')], b=[||($0, _UTF-16LE'b')])
> +- LogicalProject(a=[REGEXP_REPLACE($0, _UTF-16LE'aaa', _UTF-16LE'bbb')])
>+- LogicalTableScan(table=[[default_catalog, default_database, 
> test_source]])
> == Optimized Physical Plan ==
> Calc(select=[||(REGEXP_REPLACE(a, _UTF-16LE'aaa', _UTF-16LE'bbb'), 
> _UTF-16LE'a') AS a, ||(REGEXP_REPLACE(a, _UTF-16LE'aaa', _UTF-16LE'bbb'), 
> _UTF-16LE'b') AS b])
> +- TableSourceScan(table=[[default_catalog, default_database, test_source]], 
> fields=[a])
> == Optimized Execution Plan ==
> Calc(select=[||(REGEXP_REPLACE(a, 'aaa', 'bbb'), 'a') AS a, 
> ||(REGEXP_REPLACE(a, 'aaa', 'bbb'), 'b') AS b])
> +- TableSourceScan(table=[[default_catalog, default_database, test_source]], 
> fields=[a])
> {code}
> It can be observed that after project write, regex_place is calculated twice. 
> Generally speaking, regular expression matching is a time-consuming operation 
> and we usually do not want it to be calculated multiple times. Therefore, for 
> this scenario, we can support disabling project rewrite.
> After disabling some rules, the final plan we obtained is as follows:
> {code:sql}
> == Abstract Syntax Tree ==
> LogicalProject(a=[||($0, _UTF-16LE'a')], b=[||($0, _UTF-16LE'b')])
> +- LogicalProject(a=[REGEXP_REPLACE($0, _UTF-16LE'aaa', _UTF-16LE'bbb')])
>+- LogicalTableScan(table=[[default_catalog, default_database, 
> test_source]])
> == Optimized Physical Plan ==
> Calc(select=[||(a, _UTF-16LE'a') AS a, ||(a, _UTF-16LE'b') AS b])
> +- Calc(select=[REGEXP_REPLACE(a, _UTF-16LE'aaa', _UTF-16LE'bbb') AS a])
>+- TableSourceScan(table=[[default_catalog, default_database, 
> test_source]], fields=[a])
> == Optimized Execution Plan ==
> Calc(select=[||(a, 'a') AS a, ||(a, 'b') AS b])
> +- Calc(select=[REGEXP_REPLACE(a, 'aaa', 'bbb') AS a])
>+- TableSourceScan(table=[[default_catalog, default_database, 
> test_source]], fields=[a])
> {code}
> After testing, we probably need to modify these few rules:
> org.apache.flink.table.planner.plan.rules.logical.FlinkProjectMergeRule
> org.apache.flink.table.planner.plan.rules.logical.FlinkCalcMergeRule
> org.apache.flink.table.planner.plan.rules.logical.FlinkProjectCalcMergeRule



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-33996][table-planner]: Avoid merging projects if leads to redundant computation [flink]

2024-01-05 Thread via GitHub


jeyhunkarimov opened a new pull request, #24033:
URL: https://github.com/apache/flink/pull/24033

   
   ## What is the purpose of the change
   
   The current optimizer/planner merges two Projection/Calc nodes if they are 
deterministic. 
   In some scenarios, this check is not enough. Especially, when the top 
Projection/Calc 
   node uses the already computed value from the bottom Project/Calc node, 
merging
   the two of them might lead to redundant computation (recomputing the same 
   expressions several times). In fact, our tests contained many tests that 
   computed the same expression several times.
   
   
   ## Brief change log
   
 - Extend the `mergeable` method and add an additional check
 - Add tests
   
   
   ## Verifying this change
   
   This change added tests to `CalcMergeTestBase`
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): ( no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: ( no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-33775) Report JobInitialization traces

2024-01-05 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-33775.
--
Fix Version/s: 1.19.0
   Resolution: Fixed

merged commit 793a66b into apache:master

> Report JobInitialization traces
> ---
>
> Key: FLINK-33775
> URL: https://issues.apache.org/jira/browse/FLINK-33775
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-33695) FLIP-384: Introduce TraceReporter and use it to create checkpointing and recovery traces

2024-01-05 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-33695.
--
Resolution: Fixed

> FLIP-384: Introduce TraceReporter and use it to create checkpointing and 
> recovery traces
> 
>
> Key: FLINK-33695
> URL: https://issues.apache.org/jira/browse/FLINK-33695
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Checkpointing, Runtime / Metrics
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
> Fix For: 1.19.0
>
>
> https://cwiki.apache.org/confluence/x/TguZE
> *Motivation*
> Currently Flink has a limited observability of checkpoint and recovery 
> processes.
> For checkpointing Flink has a very detailed overview in the Flink WebUI, 
> which works great in many use cases, however it’s problematic if one is 
> operating multiple Flink clusters, or if cluster/JM dies. Additionally there 
> are a couple of metrics (like lastCheckpointDuration or lastCheckpointSize), 
> however those metrics have a couple of issues:
> * They are reported and refreshed periodically, depending on the 
> MetricReporter settings, which doesn’t take into account checkpointing 
> frequency.
> ** If checkpointing interval > metric reporting interval, we would be 
> reporting the same values multiple times.
> ** If checkpointing interval < metric reporting interval, we would be 
> randomly dropping metrics for some of the checkpoints.
> For recovery we are missing even the most basic of the metrics and Flink 
> WebUI support. Also given the fact that recovery is even less frequent 
> compared to checkpoints, adding recovery metrics would have even bigger 
> problems with unnecessary reporting the same values.
> In this FLIP I’m proposing to add support for reporting traces/spans 
> (example: Traces) and use this mechanism to report checkpointing and recovery 
> traces. I hope in the future traces will also prove useful in other areas of 
> Flink like job submission, job state changes, ... . Moreover as the API to 
> report traces will be added to the MetricGroup , users will be also able to 
> access this API. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33775] Report JobInitialization traces [flink]

2024-01-05 Thread via GitHub


pnowojski merged PR #23908:
URL: https://github.com/apache/flink/pull/23908


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33978] Introduces AsyncScalarFunction as a new UDF type [flink]

2024-01-05 Thread via GitHub


alpinegizmo commented on PR #23975:
URL: https://github.com/apache/flink/pull/23975#issuecomment-1878877745

   We should add appropriate content to the UDF docs as part of implementing 
this FLIP.
   
   
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/
 is the page where that goes. I think a new section right after the section on 
Scalar Functions would be the right place for that. Most of the text needed can 
be copied out of the FLIP, along with the example.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33978) FLIP-400: AsyncScalarFunction for asynchronous scalar function support

2024-01-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33978?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33978:
---
Labels: pull-request-available  (was: )

> FLIP-400: AsyncScalarFunction for asynchronous scalar function support
> --
>
> Key: FLINK-33978
> URL: https://issues.apache.org/jira/browse/FLINK-33978
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner, Table SQL / Runtime
>Reporter: Alan Sheinberg
>Priority: Major
>  Labels: pull-request-available
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-400%3A+AsyncScalarFunction+for+asynchronous+scalar+function+support



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33978] Introduces AsyncScalarFunction as a new UDF type [flink]

2024-01-05 Thread via GitHub


twalthr commented on code in PR #23975:
URL: https://github.com/apache/flink/pull/23975#discussion_r1442740953


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java:
##
@@ -692,6 +734,14 @@ public enum AsyncOutputMode {
 ALLOW_UNORDERED
 }
 
+/** Retry strategy in the case of failure. */
+@Experimental

Review Comment:
   drop the annotation, it should be well tested already



##
docs/layouts/shortcodes/generated/execution_config_configuration.html:
##
@@ -26,6 +26,36 @@
 Duration
 The async timeout for the asynchronous operation to 
complete.
 
+
+table.exec.async-scalar.buffer-capacity Streaming
+10
+Integer
+The max number of async i/o operation that the async lookup 
join can trigger.
+
+
+table.exec.async-scalar.max-attempts Streaming
+3
+Integer
+The max number of async retry attempts to make when 

Review Comment:
   incomplete sentence



##
docs/layouts/shortcodes/generated/execution_config_configuration.html:
##
@@ -26,6 +26,36 @@
 Duration
 The async timeout for the asynchronous operation to 
complete.
 
+
+table.exec.async-scalar.buffer-capacity Streaming
+10
+Integer
+The max number of async i/o operation that the async lookup 
join can trigger.
+
+
+table.exec.async-scalar.max-attempts Streaming
+3
+Integer
+The max number of async retry attempts to make when 
+
+
+table.exec.async-scalar.retry-delay Streaming
+100 ms
+Duration
+The delay to wait before trying again.
+
+
+table.exec.async-scalar.retry-strategy Streaming
+FIXED_DELAY
+Enum
+Restart strategy which will be used, FIXED_DELAY by 
default.Possible values:"FIXED_DELAY"

Review Comment:
   an enum with just 1 value makes not much sense, what other strategies can we 
offer. I see at least a "no retry" one?



##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunctionHelper.java:
##
@@ -67,6 +67,8 @@ public final class UserDefinedFunctionHelper {
 
 public static final String SCALAR_EVAL = "eval";
 
+public static final String ASYNC_SCALAR_EVAL = "eval";
+
 public static final String TABLE_EVAL = "eval";

Review Comment:
   Add special validation for this method to check that the return type is 
always void and the first arg is always CompletableFuture. Otherwise users 
might get a weird error for `eval(int i)` in DataTypeExtractor.



##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/DataTypeExtractor.java:
##
@@ -155,6 +157,48 @@ public static DataType extractFromMethodParameter(
 paramPos, method.getName(), baseClass.getName()));
 }
 
+/**
+ * Extracts a data type from a method parameter by considering surrounding 
classes and parameter
+ * annotation. This version assumes that the parameter is a generic type, 
and uses the generic
+ * position type as the extracted data type. For example, if the parameter 
is a
+ * CompletableFutureLong and genericPos is 0, it will extract Long.
+ */
+public static DataType extractFromGenericMethodParameter(
+DataTypeFactory typeFactory,
+Class baseClass,
+Method method,
+int paramPos,
+int genericPos) {
+
+Type parameterType = method.getGenericParameterTypes()[paramPos];
+// Trusty library allows to resolve generic types where the type 
resolves to a type with
+// generic parameters...
+parameterType = 
TypeToken.of(baseClass).resolveType(parameterType).getType();

Review Comment:
   Can you give a concrete example where this line is necessary? Personally, I 
would like to avoid deps to Guava in these crucial utils. 
`extractDataTypeWithClassContext` is able to perform the same purpose and 
resolve vars.
   I think we can drop this line if we validate for a CompletableFuture in the 
helper as mentioned above.



##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AsyncScalarFunction.java:
##
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at

Re: [PR] [FLINK-34004] Harden TestingCheckpointIDCounter [flink]

2024-01-05 Thread via GitHub


flinkbot commented on PR #24032:
URL: https://github.com/apache/flink/pull/24032#issuecomment-1878827522

   
   ## CI report:
   
   * 9a8406e252f89c78bcd4c9beb36e967c18a83045 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34004) TestingCheckpointIDCounter can easily lead to NPEs

2024-01-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-34004:
---
Labels: pull-request-available  (was: )

> TestingCheckpointIDCounter can easily lead to NPEs
> --
>
> Key: FLINK-34004
> URL: https://issues.apache.org/jira/browse/FLINK-34004
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Tests
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> The TestingCheckpointIDCounter builder doesn't define safe defaults for all 
> builder parameters. Using it can easily lead to surprising null pointer 
> exceptions in tests when code is being modified to call more methods.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34004) TestingCheckpointIDCounter can easily lead to NPEs

2024-01-05 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-34004:


 Summary: TestingCheckpointIDCounter can easily lead to NPEs
 Key: FLINK-34004
 URL: https://issues.apache.org/jira/browse/FLINK-34004
 Project: Flink
  Issue Type: Technical Debt
  Components: Tests
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.19.0


The TestingCheckpointIDCounter builder doesn't define safe defaults for all 
builder parameters. Using it can easily lead to surprising null pointer 
exceptions in tests when code is being modified to call more methods.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] Add HBase 3.0.1 connector [flink-web]

2024-01-05 Thread via GitHub


MartijnVisser opened a new pull request, #708:
URL: https://github.com/apache/flink-web/pull/708

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32571) Prebuild HBase testing docker image

2024-01-05 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32571?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser updated FLINK-32571:
---
Fix Version/s: (was: hbase-3.0.1)

> Prebuild HBase testing docker image
> ---
>
> Key: FLINK-32571
> URL: https://issues.apache.org/jira/browse/FLINK-32571
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / HBase
>Reporter: Chesnay Schepler
>Priority: Major
>
> For testing we currently build an HBase docker image on-demand during 
> testing. We can improve reliability and testing times by building this image 
> ahead of time, as the only parameter is the HBase version.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34003) Bump CI flink version on flink-connector-hbase

2024-01-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34003?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-34003:
---
Labels: pull-request-available  (was: )

> Bump CI flink version on flink-connector-hbase
> --
>
> Key: FLINK-34003
> URL: https://issues.apache.org/jira/browse/FLINK-34003
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / HBase
>Reporter: Martijn Visser
>Assignee: Martijn Visser
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34003) Bump CI flink version on flink-connector-hbase

2024-01-05 Thread Martijn Visser (Jira)
Martijn Visser created FLINK-34003:
--

 Summary: Bump CI flink version on flink-connector-hbase
 Key: FLINK-34003
 URL: https://issues.apache.org/jira/browse/FLINK-34003
 Project: Flink
  Issue Type: Technical Debt
  Components: Connectors / HBase
Reporter: Martijn Visser
Assignee: Martijn Visser






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-31238] [WIP] Use IngestDB to speed up Rocksdb rescaling recovery (rebased) [flink]

2024-01-05 Thread via GitHub


flinkbot commented on PR #24031:
URL: https://github.com/apache/flink/pull/24031#issuecomment-1878781250

   
   ## CI report:
   
   * 056b9af12d0795dadabde47e50dfb040e14ad71d UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31238] [WIP] Use IngestDB to speed up Rocksdb rescaling recovery (rebased) [flink]

2024-01-05 Thread via GitHub


StefanRRichter commented on code in PR #24031:
URL: https://github.com/apache/flink/pull/24031#discussion_r1442957056


##
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java:
##
@@ -414,90 +424,70 @@ private void 
restoreWithRescaling(Collection restoreStateHandl
  * temporary RocksDB instance for a key-groups shard. All contents from 
the temporary instance
  * are copied into the real restore instance and then the temporary 
instance is discarded.
  */
-private void restoreWithIngestDbMode(Collection 
restoreStateHandles)
+private void rescaleClipIngestDB(
+Collection 
localKeyedStateHandles,
+byte[] startKeyGroupPrefixBytes,
+byte[] stopKeyGroupPrefixBytes)
 throws Exception {
 
-Preconditions.checkArgument(restoreStateHandles != null && 
!restoreStateHandles.isEmpty());
-
-Map allDownloadSpecs =
-
CollectionUtil.newHashMapWithExpectedSize(restoreStateHandles.size());
-
 final Path absolutInstanceBasePath = 
instanceBasePath.getAbsoluteFile().toPath();
 final Path exportCfBasePath = 
absolutInstanceBasePath.resolve("export-cfs");
 Files.createDirectories(exportCfBasePath);
 
-// Open base db as Empty DB
-this.rocksHandle.openDB();
-
-// Prepare and collect all the download request to pull remote state 
to a local directory
-for (KeyedStateHandle stateHandle : restoreStateHandles) {
-if (!(stateHandle instanceof IncrementalRemoteKeyedStateHandle)) {
-throw unexpectedStateHandleException(
-IncrementalRemoteKeyedStateHandle.class, 
stateHandle.getClass());
-}
-StateHandleDownloadSpec downloadRequest =
-new StateHandleDownloadSpec(
-(IncrementalRemoteKeyedStateHandle) stateHandle,
-
absolutInstanceBasePath.resolve(UUID.randomUUID().toString()));
-allDownloadSpecs.put(stateHandle.getStateHandleId(), 
downloadRequest);
-}
-
-// Process all state downloads
-transferRemoteStateToLocalDirectory(allDownloadSpecs.values());
-
-// Transfer remaining key-groups from temporary instance into base DB
-byte[] startKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
-CompositeKeySerializationUtils.serializeKeyGroup(
-keyGroupRange.getStartKeyGroup(), startKeyGroupPrefixBytes);
+final Map>
+columnFamilyMetaDataToImport = new HashMap<>();
 
-byte[] stopKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
-CompositeKeySerializationUtils.serializeKeyGroup(
-keyGroupRange.getEndKeyGroup() + 1, stopKeyGroupPrefixBytes);
-
-HashMap> 
cfMetaDataToImport =
-new HashMap();
-
-// Insert all remaining state through creating temporary RocksDB 
instances
-for (StateHandleDownloadSpec downloadRequest : 
allDownloadSpecs.values()) {
-logger.info(
-"Starting to restore from state handle: {} with 
rescaling.",
-downloadRequest.getStateHandle());
+try {
+for (IncrementalLocalKeyedStateHandle stateHandle : 
localKeyedStateHandles) {
+logger.info(
+"Starting to restore from state handle: {} with 
rescaling using Clip/Ingest DB.",
+stateHandle);
 
-try (RestoredDBInstance tmpRestoreDBInfo =
-restoreTempDBInstanceFromDownloadedState(downloadRequest)) 
{
+try (RestoredDBInstance tmpRestoreDBInfo =
+restoreTempDBInstanceFromLocalState(stateHandle)) {
 
-List tmpColumnFamilyHandles =
-tmpRestoreDBInfo.columnFamilyHandles;
+List tmpColumnFamilyHandles =
+tmpRestoreDBInfo.columnFamilyHandles;
 
-// Clip all tmp db to Range [startKeyGroupPrefixBytes, 
stopKeyGroupPrefixBytes)
-RocksDBIncrementalCheckpointUtils.clipColumnFamilies(
-tmpRestoreDBInfo.db,
-tmpColumnFamilyHandles,
-startKeyGroupPrefixBytes,
-stopKeyGroupPrefixBytes);
-
-// Export all the Column Families
-Map 
exportedCFAndMetaData =
-RocksDBIncrementalCheckpointUtils.exportColumnFamilies(
-tmpRestoreDBInfo.db,
-tmpColumnFamilyHandles,
-tmpRestoreDBInfo.stateMetaInfoSnapshots,
-exportCfBasePath);
-
-exportedCFAndMetaData.forEach(
-(stateMeta, cfMetaData) -> {
- 

[jira] [Commented] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2024-01-05 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803557#comment-17803557
 ] 

Sergey Nuyanzin commented on FLINK-33365:
-

marked as a blocker as mentioned at 
https://lists.apache.org/thread/pc15twvf90l0d6t4vthomh6jzv1h145d

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> ---
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>Reporter: macdoor615
>Assignee: david radley
>Priority: Blocker
>  Labels: pull-request-available
> Attachments: flink-connector-jdbc-3.0.0-1.16.png, 
> flink-connector-jdbc-3.1.1-1.17.png
>
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34002) Bump CI flink version on flink-connector-elasticsearch

2024-01-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34002?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-34002:
---
Labels: pull-request-available  (was: )

> Bump CI flink version on flink-connector-elasticsearch
> --
>
> Key: FLINK-34002
> URL: https://issues.apache.org/jira/browse/FLINK-34002
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / ElasticSearch
>Reporter: Martijn Visser
>Assignee: Martijn Visser
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34002) Bump CI flink version on flink-connector-elasticsearch

2024-01-05 Thread Martijn Visser (Jira)
Martijn Visser created FLINK-34002:
--

 Summary: Bump CI flink version on flink-connector-elasticsearch
 Key: FLINK-34002
 URL: https://issues.apache.org/jira/browse/FLINK-34002
 Project: Flink
  Issue Type: Technical Debt
  Components: Connectors / ElasticSearch
Reporter: Martijn Visser
Assignee: Martijn Visser






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2024-01-05 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin updated FLINK-33365:

Priority: Blocker  (was: Critical)

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> ---
>
> Key: FLINK-33365
> URL: https://issues.apache.org/jira/browse/FLINK-33365
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: 1.18.0, 1.17.1
> Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>Reporter: macdoor615
>Assignee: david radley
>Priority: Blocker
>  Labels: pull-request-available
> Attachments: flink-connector-jdbc-3.0.0-1.16.png, 
> flink-connector-jdbc-3.1.1-1.17.png
>
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-05 Thread via GitHub


davidradl closed pull request #79: [FLINK-33365] include filters with Lookup 
joins
URL: https://github.com/apache/flink-connector-jdbc/pull/79


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Bump org.apache.commons:commons-compress from 1.23.0 to 1.24.0 [flink-connector-hbase]

2024-01-05 Thread via GitHub


dependabot[bot] opened a new pull request, #38:
URL: https://github.com/apache/flink-connector-hbase/pull/38

   Bumps org.apache.commons:commons-compress from 1.23.0 to 1.24.0.
   
   
   [![Dependabot compatibility 
score](https://dependabot-badges.githubapp.com/badges/compatibility_score?dependency-name=org.apache.commons:commons-compress=maven=1.23.0=1.24.0)](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibility-scores)
   
   Dependabot will resolve any conflicts with this PR as long as you don't 
alter it yourself. You can also trigger a rebase manually by commenting 
`@dependabot rebase`.
   
   [//]: # (dependabot-automerge-start)
   [//]: # (dependabot-automerge-end)
   
   ---
   
   
   Dependabot commands and options
   
   
   You can trigger Dependabot actions by commenting on this PR:
   - `@dependabot rebase` will rebase this PR
   - `@dependabot recreate` will recreate this PR, overwriting any edits that 
have been made to it
   - `@dependabot merge` will merge this PR after your CI passes on it
   - `@dependabot squash and merge` will squash and merge this PR after your CI 
passes on it
   - `@dependabot cancel merge` will cancel a previously requested merge and 
block automerging
   - `@dependabot reopen` will reopen this PR if it is closed
   - `@dependabot close` will close this PR and stop Dependabot recreating it. 
You can achieve the same result by closing it manually
   - `@dependabot show  ignore conditions` will show all of 
the ignore conditions of the specified dependency
   - `@dependabot ignore this major version` will close this PR and stop 
Dependabot creating any more for this major version (unless you reopen the PR 
or upgrade to it yourself)
   - `@dependabot ignore this minor version` will close this PR and stop 
Dependabot creating any more for this minor version (unless you reopen the PR 
or upgrade to it yourself)
   - `@dependabot ignore this dependency` will close this PR and stop 
Dependabot creating any more for this dependency (unless you reopen the PR or 
upgrade to it yourself)
   You can disable automated security fix PRs for this repo from the [Security 
Alerts page](https://github.com/apache/flink-connector-hbase/network/alerts).
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Bump org.apache.zookeeper:zookeeper from 3.4.14 to 3.7.2 [flink-connector-hbase]

2024-01-05 Thread via GitHub


dependabot[bot] opened a new pull request, #37:
URL: https://github.com/apache/flink-connector-hbase/pull/37

   Bumps org.apache.zookeeper:zookeeper from 3.4.14 to 3.7.2.
   
   
   [![Dependabot compatibility 
score](https://dependabot-badges.githubapp.com/badges/compatibility_score?dependency-name=org.apache.zookeeper:zookeeper=maven=3.4.14=3.7.2)](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibility-scores)
   
   Dependabot will resolve any conflicts with this PR as long as you don't 
alter it yourself. You can also trigger a rebase manually by commenting 
`@dependabot rebase`.
   
   [//]: # (dependabot-automerge-start)
   [//]: # (dependabot-automerge-end)
   
   ---
   
   
   Dependabot commands and options
   
   
   You can trigger Dependabot actions by commenting on this PR:
   - `@dependabot rebase` will rebase this PR
   - `@dependabot recreate` will recreate this PR, overwriting any edits that 
have been made to it
   - `@dependabot merge` will merge this PR after your CI passes on it
   - `@dependabot squash and merge` will squash and merge this PR after your CI 
passes on it
   - `@dependabot cancel merge` will cancel a previously requested merge and 
block automerging
   - `@dependabot reopen` will reopen this PR if it is closed
   - `@dependabot close` will close this PR and stop Dependabot recreating it. 
You can achieve the same result by closing it manually
   - `@dependabot show  ignore conditions` will show all of 
the ignore conditions of the specified dependency
   - `@dependabot ignore this major version` will close this PR and stop 
Dependabot creating any more for this major version (unless you reopen the PR 
or upgrade to it yourself)
   - `@dependabot ignore this minor version` will close this PR and stop 
Dependabot creating any more for this minor version (unless you reopen the PR 
or upgrade to it yourself)
   - `@dependabot ignore this dependency` will close this PR and stop 
Dependabot creating any more for this dependency (unless you reopen the PR or 
upgrade to it yourself)
   You can disable automated security fix PRs for this repo from the [Security 
Alerts page](https://github.com/apache/flink-connector-hbase/network/alerts).
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-05 Thread via GitHub


davidradl commented on PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1878726394

   Fixed the test failures


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-05 Thread via GitHub


davidradl closed pull request #79: [FLINK-33365] include filters with Lookup 
joins
URL: https://github.com/apache/flink-connector-jdbc/pull/79


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-05 Thread via GitHub


davidradl closed pull request #79: [FLINK-33365] include filters with Lookup 
joins
URL: https://github.com/apache/flink-connector-jdbc/pull/79


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-05 Thread via GitHub


davidradl closed pull request #79: [FLINK-33365] include filters with Lookup 
joins
URL: https://github.com/apache/flink-connector-jdbc/pull/79


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-05 Thread via GitHub


davidradl closed pull request #79: [FLINK-33365] include filters with Lookup 
joins
URL: https://github.com/apache/flink-connector-jdbc/pull/79


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Add JDBC 3.1.2 connector [flink-web]

2024-01-05 Thread via GitHub


MartijnVisser opened a new pull request, #707:
URL: https://github.com/apache/flink-web/pull/707

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-05 Thread via GitHub


davidradl closed pull request #79: [FLINK-33365] include filters with Lookup 
joins
URL: https://github.com/apache/flink-connector-jdbc/pull/79


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33897) Allow triggering unaligned checkpoint via CLI

2024-01-05 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803535#comment-17803535
 ] 

Piotr Nowojski commented on FLINK-33897:


By real world motivation, I meant if that really is an issue that someone 
complained about? If not, and this is just a theoretical possibility that comes 
from your observation when implementing FLINK-6755 "it could be implemented, 
someone might find it useful", I would put it aside for the time being. 
Honestly, I doubt many users would use this feature. In most cases just 
cancelling the job and restarting with new configuration would be faster vs 
someone first trying to find out in the docs/user mailing list/stack overflow 
that he can actually trigger unaligned checkpoint from CLI first. This would be 
only useful to a handful of power users, but those should already know about 
that it's better to use unaligned checkpoints from the get go.

{quote}
I'm not very familiar with this part so if you think this is a big change, I 
won't insist on doing it.
{quote}
Adding a new BarrierHandlerState maybe is not a very big change per se, but 
will visible increase complexity of the code when someone needs to 
read/understand it.

{quote}
I do agree we could enable timeout for aligned cp by default, which greatly 
reduce this case
{quote}
Let me start the dev mailing list discussion about that.

> Allow triggering unaligned checkpoint via CLI
> -
>
> Key: FLINK-33897
> URL: https://issues.apache.org/jira/browse/FLINK-33897
> Project: Flink
>  Issue Type: New Feature
>  Components: Command Line Client, Runtime / Checkpointing
>Reporter: Zakelly Lan
>Assignee: Zakelly Lan
>Priority: Major
>
> After FLINK-6755, user could trigger checkpoint through CLI. However I 
> noticed there would be value supporting trigger it in unaligned way, since 
> the job may encounter a high back-pressure and an aligned checkpoint would 
> fail.
>  
> I suggest we provide an option '-unaligned' in CLI to support that.
>  
> Similar option would also be useful for REST api



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33970][jdbc][docs] Remove dead link [flink-connector-jdbc]

2024-01-05 Thread via GitHub


GOODBOY008 commented on PR #88:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/88#issuecomment-1878625591

   @MartijnVisser I want to fix dead link in document first. And I also want to 
add dead link check into 
https://github.com/apache/flink-connector-shared-utils/tree/ci_utils ,because 
hugo build cannot check dead link in document. And also in flink repo should 
add this check also?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-05 Thread via GitHub


davidradl commented on PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1878620080

   I see there are some test failures - I will look into them
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-33970][jdbc][docs] Add docs check dead links [flink-connector-shared-utils]

2024-01-05 Thread via GitHub


GOODBOY008 opened a new pull request, #32:
URL: https://github.com/apache/flink-connector-shared-utils/pull/32

Add docs check dead links.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33634] Add Conditions to Flink CRD's Status field [flink-kubernetes-operator]

2024-01-05 Thread via GitHub


tagarr commented on code in PR #749:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/749#discussion_r1442824067


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java:
##
@@ -227,4 +235,28 @@ private boolean 
validateDeployment(FlinkResourceContext ctx) {
 }
 return true;
 }
+
+private void setCRStatus(FlinkDeployment flinkApp) {
+final List conditions = new ArrayList<>();
+FlinkDeploymentStatus deploymentStatus = flinkApp.getStatus();
+switch (deploymentStatus.getJobManagerDeploymentStatus()) {

Review Comment:
   I think we need a bit more logic here. We need to check if the status 
doesn't already have a condition in it's list that matches the same type, we 
need to then check the reason, status and message of this condition and only if 
they are different replace the existing condition with the new one. 
   
   We shouldn't just replace the existing list of conditions with a new one, 
there may be additional conditions of different types added later that would 
get removed. This may be one of the reasons why the tests have had to be 
modified to bump up the number of status changes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33634] Add Conditions to Flink CRD's Status field [flink-kubernetes-operator]

2024-01-05 Thread via GitHub


tagarr commented on code in PR #749:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/749#discussion_r1442824067


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java:
##
@@ -227,4 +235,28 @@ private boolean 
validateDeployment(FlinkResourceContext ctx) {
 }
 return true;
 }
+
+private void setCRStatus(FlinkDeployment flinkApp) {
+final List conditions = new ArrayList<>();
+FlinkDeploymentStatus deploymentStatus = flinkApp.getStatus();
+switch (deploymentStatus.getJobManagerDeploymentStatus()) {

Review Comment:
   I think we need a bit more logic here. We need to check if the status 
doesn't already have a condition in it's list that matches the same type, we 
need to then check the reason and message of this condition and only if they 
are different replace the existing condition with the new one. 
   
   We shouldn't just replace the existing list of conditions with a new one, 
there may be additional conditions of different types added later that would 
get removed. This may be one of the reasons why the tests have had to be 
modified to bump up the number of status changes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add HBase connector 3.0.0 [flink-web]

2024-01-05 Thread via GitHub


MartijnVisser merged PR #591:
URL: https://github.com/apache/flink-web/pull/591


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33996) Support disabling project rewrite when multiple exprs in the project reference the same sub project field.

2024-01-05 Thread Feng Jin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803524#comment-17803524
 ] 

Feng Jin commented on FLINK-33996:
--

[~xuyangzhong]  Introducing a new rule may be a better way, otherwise we would 
need to modify multiple rules listed above.

> Support disabling project rewrite when multiple exprs in the project 
> reference the same sub project field.
> --
>
> Key: FLINK-33996
> URL: https://issues.apache.org/jira/browse/FLINK-33996
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.18.0
>Reporter: Feng Jin
>Priority: Major
>
> When multiple top projects reference the same bottom project, project rewrite 
> rules may result in complex projects being calculated multiple times.
> Take the following SQL as an example:
> {code:sql}
> create table test_source(a varchar) with ('connector'='datagen');
> explan plan for select a || 'a' as a, a || 'b' as b FROM (select 
> REGEXP_REPLACE(a, 'aaa', 'bbb') as a FROM test_source);
> {code}
> The final SQL plan is as follows:
> {code:sql}
> == Abstract Syntax Tree ==
> LogicalProject(a=[||($0, _UTF-16LE'a')], b=[||($0, _UTF-16LE'b')])
> +- LogicalProject(a=[REGEXP_REPLACE($0, _UTF-16LE'aaa', _UTF-16LE'bbb')])
>+- LogicalTableScan(table=[[default_catalog, default_database, 
> test_source]])
> == Optimized Physical Plan ==
> Calc(select=[||(REGEXP_REPLACE(a, _UTF-16LE'aaa', _UTF-16LE'bbb'), 
> _UTF-16LE'a') AS a, ||(REGEXP_REPLACE(a, _UTF-16LE'aaa', _UTF-16LE'bbb'), 
> _UTF-16LE'b') AS b])
> +- TableSourceScan(table=[[default_catalog, default_database, test_source]], 
> fields=[a])
> == Optimized Execution Plan ==
> Calc(select=[||(REGEXP_REPLACE(a, 'aaa', 'bbb'), 'a') AS a, 
> ||(REGEXP_REPLACE(a, 'aaa', 'bbb'), 'b') AS b])
> +- TableSourceScan(table=[[default_catalog, default_database, test_source]], 
> fields=[a])
> {code}
> It can be observed that after project write, regex_place is calculated twice. 
> Generally speaking, regular expression matching is a time-consuming operation 
> and we usually do not want it to be calculated multiple times. Therefore, for 
> this scenario, we can support disabling project rewrite.
> After disabling some rules, the final plan we obtained is as follows:
> {code:sql}
> == Abstract Syntax Tree ==
> LogicalProject(a=[||($0, _UTF-16LE'a')], b=[||($0, _UTF-16LE'b')])
> +- LogicalProject(a=[REGEXP_REPLACE($0, _UTF-16LE'aaa', _UTF-16LE'bbb')])
>+- LogicalTableScan(table=[[default_catalog, default_database, 
> test_source]])
> == Optimized Physical Plan ==
> Calc(select=[||(a, _UTF-16LE'a') AS a, ||(a, _UTF-16LE'b') AS b])
> +- Calc(select=[REGEXP_REPLACE(a, _UTF-16LE'aaa', _UTF-16LE'bbb') AS a])
>+- TableSourceScan(table=[[default_catalog, default_database, 
> test_source]], fields=[a])
> == Optimized Execution Plan ==
> Calc(select=[||(a, 'a') AS a, ||(a, 'b') AS b])
> +- Calc(select=[REGEXP_REPLACE(a, 'aaa', 'bbb') AS a])
>+- TableSourceScan(table=[[default_catalog, default_database, 
> test_source]], fields=[a])
> {code}
> After testing, we probably need to modify these few rules:
> org.apache.flink.table.planner.plan.rules.logical.FlinkProjectMergeRule
> org.apache.flink.table.planner.plan.rules.logical.FlinkCalcMergeRule
> org.apache.flink.table.planner.plan.rules.logical.FlinkProjectCalcMergeRule



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33996) Support disabling project rewrite when multiple exprs in the project reference the same sub project field.

2024-01-05 Thread Feng Jin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33996?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feng Jin updated FLINK-33996:
-
Description: 
When multiple top projects reference the same bottom project, project rewrite 
rules may result in complex projects being calculated multiple times.

Take the following SQL as an example:
{code:sql}
create table test_source(a varchar) with ('connector'='datagen');

explan plan for select a || 'a' as a, a || 'b' as b FROM (select 
REGEXP_REPLACE(a, 'aaa', 'bbb') as a FROM test_source);
{code}
The final SQL plan is as follows:
{code:sql}
== Abstract Syntax Tree ==
LogicalProject(a=[||($0, _UTF-16LE'a')], b=[||($0, _UTF-16LE'b')])
+- LogicalProject(a=[REGEXP_REPLACE($0, _UTF-16LE'aaa', _UTF-16LE'bbb')])
   +- LogicalTableScan(table=[[default_catalog, default_database, test_source]])

== Optimized Physical Plan ==
Calc(select=[||(REGEXP_REPLACE(a, _UTF-16LE'aaa', _UTF-16LE'bbb'), 
_UTF-16LE'a') AS a, ||(REGEXP_REPLACE(a, _UTF-16LE'aaa', _UTF-16LE'bbb'), 
_UTF-16LE'b') AS b])
+- TableSourceScan(table=[[default_catalog, default_database, test_source]], 
fields=[a])

== Optimized Execution Plan ==
Calc(select=[||(REGEXP_REPLACE(a, 'aaa', 'bbb'), 'a') AS a, 
||(REGEXP_REPLACE(a, 'aaa', 'bbb'), 'b') AS b])
+- TableSourceScan(table=[[default_catalog, default_database, test_source]], 
fields=[a])
{code}
It can be observed that after project write, regex_place is calculated twice. 
Generally speaking, regular expression matching is a time-consuming operation 
and we usually do not want it to be calculated multiple times. Therefore, for 
this scenario, we can support disabling project rewrite.

After disabling some rules, the final plan we obtained is as follows:
{code:sql}
== Abstract Syntax Tree ==
LogicalProject(a=[||($0, _UTF-16LE'a')], b=[||($0, _UTF-16LE'b')])
+- LogicalProject(a=[REGEXP_REPLACE($0, _UTF-16LE'aaa', _UTF-16LE'bbb')])
   +- LogicalTableScan(table=[[default_catalog, default_database, test_source]])

== Optimized Physical Plan ==
Calc(select=[||(a, _UTF-16LE'a') AS a, ||(a, _UTF-16LE'b') AS b])
+- Calc(select=[REGEXP_REPLACE(a, _UTF-16LE'aaa', _UTF-16LE'bbb') AS a])
   +- TableSourceScan(table=[[default_catalog, default_database, test_source]], 
fields=[a])

== Optimized Execution Plan ==
Calc(select=[||(a, 'a') AS a, ||(a, 'b') AS b])
+- Calc(select=[REGEXP_REPLACE(a, 'aaa', 'bbb') AS a])
   +- TableSourceScan(table=[[default_catalog, default_database, test_source]], 
fields=[a])
{code}
After testing, we probably need to modify these few rules:

org.apache.flink.table.planner.plan.rules.logical.FlinkProjectMergeRule

org.apache.flink.table.planner.plan.rules.logical.FlinkCalcMergeRule

org.apache.flink.table.planner.plan.rules.logical.FlinkProjectCalcMergeRule

  was:
When multiple top projects reference the same bottom project, project rewrite 
rules may result in complex projects being calculated multiple times.

Take the following SQL as an example:

{code:sql}
create table test_source(a varchar) with ('connector'='datagen');

explan plan for select a || 'a' as a, a || 'b' as b FROM (select 
REGEXP_REPLACE(a, 'aaa', 'bbb') as a FROM test_source);
{code}


The final SQL plan is as follows:


{code:sql}
== Abstract Syntax Tree ==
LogicalProject(a=[||($0, _UTF-16LE'a')], b=[||($0, _UTF-16LE'b')])
+- LogicalProject(a=[REGEXP_REPLACE($0, _UTF-16LE'aaa', _UTF-16LE'bbb')])
   +- LogicalTableScan(table=[[default_catalog, default_database, test_source]])

== Optimized Physical Plan ==
Calc(select=[||(REGEXP_REPLACE(a, _UTF-16LE'aaa', _UTF-16LE'bbb'), 
_UTF-16LE'a') AS a, ||(REGEXP_REPLACE(a, _UTF-16LE'aaa', _UTF-16LE'bbb'), 
_UTF-16LE'b') AS b])
+- TableSourceScan(table=[[default_catalog, default_database, test_source]], 
fields=[a])

== Optimized Execution Plan ==
Calc(select=[||(REGEXP_REPLACE(a, 'aaa', 'bbb'), 'a') AS a, 
||(REGEXP_REPLACE(a, 'aaa', 'bbb'), 'b') AS b])
+- TableSourceScan(table=[[default_catalog, default_database, test_source]], 
fields=[a])
{code}

It can be observed that after project write, regex_place is calculated twice. 
Generally speaking, regular expression matching is a time-consuming operation 
and we usually do not want it to be calculated multiple times. Therefore, for 
this scenario, we can support disabling project rewrite.

After disabling some rules, the final plan we obtained is as follows:


{code:sql}
== Abstract Syntax Tree ==
LogicalProject(a=[||($0, _UTF-16LE'a')], b=[||($0, _UTF-16LE'b')])
+- LogicalProject(a=[REGEXP_REPLACE($0, _UTF-16LE'aaa', _UTF-16LE'bbb')])
   +- LogicalTableScan(table=[[default_catalog, default_database, test_source]])

== Optimized Physical Plan ==
Calc(select=[||(a, _UTF-16LE'a') AS a, ||(a, _UTF-16LE'b') AS b])
+- Calc(select=[REGEXP_REPLACE(a, _UTF-16LE'aaa', _UTF-16LE'bbb') AS a])
   +- TableSourceScan(table=[[default_catalog, default_database, test_source]], 
fields=[a])

== Optimized Execution Plan ==
Calc(select=[||(a, 'a') AS 

Re: [PR] [FLINK-33775] Report JobInitialization traces [flink]

2024-01-05 Thread via GitHub


pnowojski commented on code in PR #23908:
URL: https://github.com/apache/flink/pull/23908#discussion_r1442770858


##
docs/content/docs/ops/traces.md:
##
@@ -97,7 +97,7 @@ Flink reports a single span trace for the whole checkpoint 
once checkpoint reach
   
   
 
-  org.apache.flink.runtime.checkpoint.CheckpointStatsTracker
+  org.apache.flink.runtime.checkpoint.CheckpointStatsTracker

Review Comment:
   I've added line breaks to the scope. More readable and the effect is the 
same.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33775] Report JobInitialization traces [flink]

2024-01-05 Thread via GitHub


pnowojski commented on code in PR #23908:
URL: https://github.com/apache/flink/pull/23908#discussion_r1442749273


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java:
##
@@ -155,7 +167,12 @@ public CheckpointStatsSnapshot createSnapshot() {
 counts.createSnapshot(),
 summary.createSnapshot(),
 history.createSnapshot(),
-latestRestoredCheckpoint);
+jobInitializationMetricsBuilder
+.map(
+JobInitializationMetricsBuilder
+
::buildRestoredCheckpointStats)
+.orElse(Optional.empty())
+.orElse(null));

Review Comment:
   Ahh, nice!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34001) doc of "Configure Operator-level State TTL" error

2024-01-05 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser updated FLINK-34001:
---
Component/s: Documentation
 Table SQL / API
 (was: API / State Processor)

> doc of "Configure Operator-level State TTL" error
> -
>
> Key: FLINK-34001
> URL: https://issues.apache.org/jira/browse/FLINK-34001
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation, Table SQL / API
>Affects Versions: 1.18.0
>Reporter: yong yang
>Priority: Major
>
> doc:
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/concepts/overview/#idle-state-retention-time
> The current TTL value for both left and right side is {{{}"0 ms"{}}}, which 
> means the state retention is not enabled. 
>  
> but i test find :
> The current TTL value for both left and right side is {{{}"0 ms"{}}}, which 
> means the state is permanence keep!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-05 Thread via GitHub


davidradl commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1442730232


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java:
##
@@ -96,28 +97,115 @@ public JdbcDynamicTableSource(
 public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext 
context) {
 // JDBC only support non-nested look up keys
 String[] keyNames = new String[context.getKeys().length];
+
 for (int i = 0; i < keyNames.length; i++) {
 int[] innerKeyArr = context.getKeys()[i];
 Preconditions.checkArgument(
 innerKeyArr.length == 1, "JDBC only support non-nested 
look up keys");
 keyNames[i] = 
DataType.getFieldNames(physicalRowDataType).get(innerKeyArr[0]);
 }
+
 final RowType rowType = (RowType) physicalRowDataType.getLogicalType();
+
+String[] conditions = null;
+
+if (this.resolvedPredicates != null) {
+conditions = new String[this.resolvedPredicates.size()];
+int processedPushdownParamsIndex = 0;
+for (int i = 0; i < this.resolvedPredicates.size(); i++) {
+String resolvedPredicate = this.resolvedPredicates.get(i);
+
+/*
+ * This replace seems like it should be using a Flink class to 
resolve the parameter. It does not
+ * effect the dialects as the placeholder comes from 
JdbcFilterPushdownPreparedStatementVisitor.
+ *
+ * Here is what has been considered as alternatives.
+ *
+ * We cannot use the way this is done in 
getScanRuntimeProvider, as the index we have is the index
+ * into the filters, but it needs the index into the fields. 
For example one lookup key and one filter
+ * would both have an index of 0, which the subsequent code 
would incorrectly resolve to the first
+ * field.
+ * We cannot use the PreparedStatement as we have not got 
access to the statement here.
+ * We cannot use ParameterizedPredicate as it takes the filter 
expression as input (e.g EQUALS(...)
+ * not the form we have here an example would be ('field1'= ?).
+ *
+ * An entry in the resolvedPredicates list may have more than 
one associated pushdown parameter, for example
+ * a query like this : ... on e.type = 2 and (e.age = 50 OR 
height > 90)  and a.ip = e.ip;
+ * will have 2 resolvedPredicates and 3 pushdownParams. The 
2nd and 3rd pushdownParams will be for the second
+ * resolvedPredicate.
+ *
+ */
+ArrayList paramsForThisPredicate = new ArrayList();
+char placeholderChar =
+
JdbcFilterPushdownPreparedStatementVisitor.PUSHDOWN_PREDICATE_PLACEHOLDER
+.charAt(0);
+
+int count =
+(int) resolvedPredicate.chars().filter(ch -> ch == 
placeholderChar).count();
+
+for (int j = processedPushdownParamsIndex;
+j < processedPushdownParamsIndex + count;
+j++) {
+
paramsForThisPredicate.add(this.pushdownParams[j].toString());
+}
+processedPushdownParamsIndex = processedPushdownParamsIndex + 
count;

Review Comment:
   I have implemented an approach that I think is much more reliable. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-25421] Port JDBC Sink to new Unified Sink API (FLIP-143) [flink-connector-jdbc]

2024-01-05 Thread via GitHub


RocMarshal commented on code in PR #2:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/2#discussion_r1442655533


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/datasource/statements/JdbcQueryStatement.java:
##
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.datasource.statements;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+/**
+ * Sets {@link PreparedStatement} parameters to use in JDBC Sink based on a 
specific type of record.

Review Comment:
   How about 
   
   ```
* Sets {@link PreparedStatement} parameters that's used in JDBC Sink based 
on a specified type of the record.
   ```
   
   ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-28915) Make application mode could support remote DFS schema(e.g. S3, OSS, HDFS, etc.)

2024-01-05 Thread Jira


[ 
https://issues.apache.org/jira/browse/FLINK-28915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803501#comment-17803501
 ] 

Márton Balassi commented on FLINK-28915:


Due to the 
[inactivity|https://github.com/apache/flink/pull/20779#issuecomment-1878422305] 
on the PRs [~ferenc-csaky] has offered to finish this implementation using the 
original commit of [~hjw] as a base. 

> Make application mode could support remote DFS schema(e.g. S3, OSS, HDFS, 
> etc.)
> ---
>
> Key: FLINK-28915
> URL: https://issues.apache.org/jira/browse/FLINK-28915
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / Kubernetes, flink-contrib
>Reporter: hjw
>Assignee: hjw
>Priority: Major
>  Labels: pull-request-available, stale-assigned
>
> As the Flink document show , local is the only supported scheme in Native k8s 
> deployment.
> Is there have a plan to support s3 filesystem? thx.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-05 Thread via GitHub


davidradl closed pull request #79: [FLINK-33365] include filters with Lookup 
joins
URL: https://github.com/apache/flink-connector-jdbc/pull/79


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33980][core] Reorganize job configuration [flink]

2024-01-05 Thread via GitHub


JunRuiLee commented on code in PR #24025:
URL: https://github.com/apache/flink/pull/24025#discussion_r1442718242


##
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java:
##
@@ -1201,6 +1212,9 @@ public void configure(ReadableConfig configuration, 
ClassLoader classLoader) {
 .getOptional(ExecutionOptions.SNAPSHOT_COMPRESSION)
 .ifPresent(this::setUseSnapshotCompression);
 
RestartStrategies.fromConfiguration(configuration).ifPresent(this::setRestartStrategy);
+configuration
+.getOptional(RestartStrategyOptions.RESTART_STRATEGY)
+.ifPresent(s -> this.setRestartStrategy(configuration));

Review Comment:
   Yes, you're right. I misunderstood the code when I was reviewing it; sorry 
about that. I've fixed the error now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-25421] Port JDBC Sink to new Unified Sink API (FLIP-143) [flink-connector-jdbc]

2024-01-05 Thread via GitHub


RocMarshal commented on code in PR #2:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/2#discussion_r1442712519


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/datasource/statements/JdbcQueryStatement.java:
##
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.datasource.statements;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+/**
+ * Sets {@link PreparedStatement} parameters to use in JDBC Sink based on a 
specific type of record.
+ */
+@PublicEvolving
+public interface JdbcQueryStatement extends Serializable {
+String query();
+
+void map(PreparedStatement ps, T data) throws SQLException;

Review Comment:
   - `data` -> `record`
   - The method described by the interface here is more like filling the data 
of a message into a preparedstatement. So how about
- changing method name like `JdbcStatementFiller` or 
`JdbcStatementRender`
- changing the interface name to `fill` or `render` 
   ?



##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/datasource/statements/JdbcQueryStatement.java:
##
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.datasource.statements;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+/**
+ * Sets {@link PreparedStatement} parameters to use in JDBC Sink based on a 
specific type of record.

Review Comment:
   How about 
   
   ```
* Sets {@link PreparedStatement} parameters that's used in JDBC Sink based 
on a specified type of the record.
   ```
   
   ?



##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/datasource/transactions/xa/domain/TransactionId.java:
##
@@ -0,0 +1,271 @@
+package org.apache.flink.connector.jdbc.datasource.transactions.xa.domain;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+
+import javax.transaction.xa.Xid;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+
+import static org.apache.flink.util.StringUtils.byteToHexString;
+
+/** A simple {@link Xid} implementation. */
+public class TransactionId implements Xid, Serializable {
+
+private static final long serialVersionUID = 1L;
+
+private static final int FORMAT_ID = 202;
+
+private final byte[] jobId;
+private final int subtaskId;
+private final int numberOfSubtasks;
+private final long checkpointId;
+private final int attempts;
+private final boolean restored;
+
+private TransactionId(
+byte[] jobId,
+int subtaskId,
+int numberOfSubtasks,
+long checkpointId,
+int attempts,
+boolean restored) {
+this.jobId = jobId;
+ 

Re: [PR] [FLINK-20281][table] support consuming cdc stream about window aggregate [flink]

2024-01-05 Thread via GitHub


flinkbot commented on PR #24030:
URL: https://github.com/apache/flink/pull/24030#issuecomment-1878444332

   
   ## CI report:
   
   * fa4675861793c8cd42d7ece15334879b2ec54953 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-20281][table] support consuming cdc stream about window aggregate [flink]

2024-01-05 Thread via GitHub


xuyangzhong opened a new pull request, #24030:
URL: https://github.com/apache/flink/pull/24030

   ## What is the purpose of the change
   
   Currently, window aggregation doesn't support to consume a changelog stream. 
   
   See more at [FLINK-20281](https://issues.apache.org/jira/browse/FLINK-20281) 
and [FLINK-27539](https://issues.apache.org/jira/browse/FLINK-27539)
   
   ## Brief change log
   
   *(for example:)*
 - *Update the logic about inferring ModifyKind and UpdateKind for window 
agg node*
 - *Introduce a count in window agg, and when there is no data in window, 
not output the agg result*
 - *Add tests for TUMBLE, HOP and CUMULATE window tvf to consume cdc source*
   
   ## Verifying this change
   
   Tests are added for TUMBLE, HOP and CUMULATE window tvf to consume cdc source
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? no need to update doc
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-20281) Window aggregation supports changelog stream input

2024-01-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-20281:
---
Labels: auto-deprioritized-major auto-deprioritized-minor 
pull-request-available  (was: auto-deprioritized-major auto-deprioritized-minor)

> Window aggregation supports changelog stream input
> --
>
> Key: FLINK-20281
> URL: https://issues.apache.org/jira/browse/FLINK-20281
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner, Table SQL / Runtime
>Reporter: Jark Wu
>Assignee: xuyang
>Priority: Major
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> pull-request-available
> Attachments: screenshot-1.png
>
>
> Currently, window aggregation doesn't support to consume a changelog stream. 
> This makes it impossible to do a window aggregation on changelog sources 
> (e.g. Kafka with Debezium format, or upsert-kafka, or mysql-cdc). 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33980][core] Reorganize job configuration [flink]

2024-01-05 Thread via GitHub


1996fanrui commented on code in PR #24025:
URL: https://github.com/apache/flink/pull/24025#discussion_r1442706214


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java:
##
@@ -453,7 +440,10 @@ public StreamExecutionEnvironment setBufferTimeout(long 
timeoutMillis) {
 throw new IllegalArgumentException("Timeout of buffer must be 
non-negative or -1");
 }
 
-this.bufferTimeout = timeoutMillis;
+if (timeoutMillis == ExecutionOptions.DISABLED_NETWORK_BUFFER_TIMEOUT) 
{
+this.configuration.set(ExecutionOptions.BUFFER_TIMEOUT_ENABLED, 
false);
+}
+this.configuration.set(ExecutionOptions.BUFFER_TIMEOUT, 
Duration.ofMillis(timeoutMillis));
 return this;

Review Comment:
   Thanks for the clarification. It's fine for me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33980][core] Reorganize job configuration [flink]

2024-01-05 Thread via GitHub


1996fanrui commented on code in PR #24025:
URL: https://github.com/apache/flink/pull/24025#discussion_r1442704443


##
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java:
##
@@ -1201,6 +1212,9 @@ public void configure(ReadableConfig configuration, 
ClassLoader classLoader) {
 .getOptional(ExecutionOptions.SNAPSHOT_COMPRESSION)
 .ifPresent(this::setUseSnapshotCompression);
 
RestartStrategies.fromConfiguration(configuration).ifPresent(this::setRestartStrategy);
+configuration
+.getOptional(RestartStrategyOptions.RESTART_STRATEGY)
+.ifPresent(s -> this.setRestartStrategy(configuration));

Review Comment:
   I don't know why removing it. If it's removed, on one calls `private void 
setRestartStrategy(ReadableConfig configuration) {`, right? And all options 
cannot be added to configuration of ExecutionConfig. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28915] Flink Native k8s mode jar localtion support s3 schema. [flink]

2024-01-05 Thread via GitHub


mbalassi commented on PR #20779:
URL: https://github.com/apache/flink/pull/20779#issuecomment-1878422305

   Hi @SwimSweet, given your lack of response @ferenc-csaky offered to build on 
your work and take it forward based on my comments above. Hope that is OK, he 
is planning to post an updated PR next week.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33856) Add metrics to monitor the interaction performance between task and external storage system in the process of checkpoint making

2024-01-05 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803484#comment-17803484
 ] 

Rui Fan commented on FLINK-33856:
-

{quote}I would suggest that both of yo chat offline about the scope of the 
changes in Rui Fan's FLIP and/or eventual division of work. I'm not sure if Rui 
Fan plans to add per task/subtask spans for checkpoints and/or recovery.
{quote}
FLIP-412 focuses on adding the time-consuming span of each stage when starting 
the Flink job. It will be the job level instead subtask or task level according 
to my idea.

> Add metrics to monitor the interaction performance between task and external 
> storage system in the process of checkpoint making
> ---
>
> Key: FLINK-33856
> URL: https://issues.apache.org/jira/browse/FLINK-33856
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.18.0
>Reporter: Jufang He
>Assignee: Jufang He
>Priority: Major
>  Labels: pull-request-available
>
> When Flink makes a checkpoint, the interaction performance with the external 
> file system has a great impact on the overall time-consuming. Therefore, it 
> is easy to observe the bottleneck point by adding performance indicators when 
> the task interacts with the external file storage system. These include: the 
> rate of file write , the latency to write the file, the latency to close the 
> file.
> In flink side add the above metrics has the following advantages: convenient 
> statistical different task E2E time-consuming; do not need to distinguish the 
> type of external storage system, can be unified in the 
> FsCheckpointStreamFactory.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34001) doc of "Configure Operator-level State TTL" error

2024-01-05 Thread yong yang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yong yang updated FLINK-34001:
--
Summary: doc of "Configure Operator-level State TTL" error  (was: doc diff 
from code)

> doc of "Configure Operator-level State TTL" error
> -
>
> Key: FLINK-34001
> URL: https://issues.apache.org/jira/browse/FLINK-34001
> Project: Flink
>  Issue Type: Bug
>  Components: API / State Processor
>Affects Versions: 1.18.0
>Reporter: yong yang
>Priority: Major
>
> doc:
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/concepts/overview/#idle-state-retention-time
> The current TTL value for both left and right side is {{{}"0 ms"{}}}, which 
> means the state retention is not enabled. 
>  
> but i test find :
> The current TTL value for both left and right side is {{{}"0 ms"{}}}, which 
> means the state is permanence keep!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34001) doc diff from code

2024-01-05 Thread yong yang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34001?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803483#comment-17803483
 ] 

yong yang commented on FLINK-34001:
---

{code:java}
//代码占位符
// plan1.json

{
  "flinkVersion" : "1.18",
  "nodes" : [ {
    "id" : 7,
    "type" : "stream-exec-table-source-scan_1",
    "scanTableSource" : {
      "table" : {
        "identifier" : "`default_catalog`.`default_database`.`s1`",
        "resolvedTable" : {
          "schema" : {
            "columns" : [ {
              "name" : "id",
              "dataType" : "VARCHAR(2147483647)"
            }, {
              "name" : "name",
              "dataType" : "VARCHAR(2147483647)"
            } ],
            "watermarkSpecs" : [ ]
          },
          "partitionKeys" : [ ],
          "options" : {
            "properties.bootstrap.servers" : "localhost:9092",
            "connector" : "kafka",
            "format" : "csv",
            "topic" : "s1",
            "properties.group.id" : "g1",
            "scan.startup.mode" : "latest-offset"
          }
        }
      }
    },
    "outputType" : "ROW<`id` VARCHAR(2147483647), `name` VARCHAR(2147483647)>",
    "description" : "TableSourceScan(table=[[default_catalog, default_database, 
s1]], fields=[id, name])",
    "inputProperties" : [ ]
  }, {
    "id" : 8,
    "type" : "stream-exec-exchange_1",
    "inputProperties" : [ {
      "requiredDistribution" : {
        "type" : "HASH",
        "keys" : [ 0 ]
      },
      "damBehavior" : "PIPELINED",
      "priority" : 0
    } ],
    "outputType" : "ROW<`id` VARCHAR(2147483647), `name` VARCHAR(2147483647)>",
    "description" : "Exchange(distribution=[hash[id]])"
  }, {
    "id" : 9,
    "type" : "stream-exec-table-source-scan_1",
    "scanTableSource" : {
      "table" : {
        "identifier" : "`default_catalog`.`default_database`.`s2`",
        "resolvedTable" : {
          "schema" : {
            "columns" : [ {
              "name" : "id",
              "dataType" : "VARCHAR(2147483647) NOT NULL"
            }, {
              "name" : "age",
              "dataType" : "INT"
            } ],
            "watermarkSpecs" : [ ],
            "primaryKey" : {
              "name" : "PK_3386",
              "type" : "PRIMARY_KEY",
              "columns" : [ "id" ]
            }
          },
          "partitionKeys" : [ ],
          "options" : {
            "properties.bootstrap.servers" : "localhost:9092",
            "key.format" : "csv",
            "topic" : "s3",
            "connector" : "upsert-kafka",
            "value.format" : "csv"
          }
        }
      }
    },
    "outputType" : "ROW<`id` VARCHAR(2147483647) NOT NULL, `age` INT>",
    "description" : "TableSourceScan(table=[[default_catalog, default_database, 
s2]], fields=[id, age])",
    "inputProperties" : [ ]
  }, {
    "id" : 10,
    "type" : "stream-exec-exchange_1",
    "inputProperties" : [ {
      "requiredDistribution" : {
        "type" : "HASH",
        "keys" : [ 0 ]
      },
      "damBehavior" : "PIPELINED",
      "priority" : 0
    } ],
    "outputType" : "ROW<`id` VARCHAR(2147483647) NOT NULL, `age` INT>",
    "description" : "Exchange(distribution=[hash[id]])"
  }, {
    "id" : 11,
    "type" : "stream-exec-changelog-normalize_1",
    "configuration" : {
      "table.exec.mini-batch.enabled" : "false",
      "table.exec.mini-batch.size" : "-1"
    },
    "uniqueKeys" : [ 0 ],
    "generateUpdateBefore" : true,
    "state" : [ {
      "index" : 0,
      "ttl" : "0 ms",
      "name" : "changelogNormalizeState"
    } ],
    "inputProperties" : [ {
      "requiredDistribution" : {
        "type" : "UNKNOWN"
      },
      "damBehavior" : "PIPELINED",
      "priority" : 0
    } ],
    "outputType" : "ROW<`id` VARCHAR(2147483647) NOT NULL, `age` INT>",
    "description" : "ChangelogNormalize(key=[id])"
  }, {
    "id" : 12,
    "type" : "stream-exec-exchange_1",
    "inputProperties" : [ {
      "requiredDistribution" : {
        "type" : "HASH",
        "keys" : [ 0 ]
      },
      "damBehavior" : "PIPELINED",
      "priority" : 0
    } ],
    "outputType" : "ROW<`id` VARCHAR(2147483647) NOT NULL, `age` INT>",
    "description" : "Exchange(distribution=[hash[id]])"
  }, {
    "id" : 13,
    "type" : "stream-exec-join_1",
    "joinSpec" : {
      "joinType" : "INNER",
      "leftKeys" : [ 0 ],
      "rightKeys" : [ 0 ],
      "filterNulls" : [ true ],
      "nonEquiCondition" : null
    },
    "rightUpsertKeys" : [ [ 0 ] ],
    "state" : [ {
      "index" : 0,
      "ttl" : "0 ms",
      "name" : "leftState"
    }, {
      "index" : 1,
      "ttl" : "0 ms",
      "name" : "rightState"
    } ],
    "inputProperties" : [ {
      "requiredDistribution" : {
        "type" : "UNKNOWN"
      },
      "damBehavior" : "PIPELINED",
      "priority" : 0
    }, {
      "requiredDistribution" : {
        "type" : "UNKNOWN"
      },
      "damBehavior" : "PIPELINED",
      

[jira] [Commented] (FLINK-34001) doc diff from code

2024-01-05 Thread yong yang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34001?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803482#comment-17803482
 ] 

yong yang commented on FLINK-34001:
---

{code:java}
//代码占位符
package com.yy.state.OperatorStateTTL

import org.apache.flink.configuration.{Configuration, RestOptions}
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.{StreamStatementSet, 
StreamTableEnvironment}

import java.time.ZoneId
import scala.util.Random

/**
 * 看下 EXECUTE PLAN 和 statementset 怎么结合
 * 适配flink web ui 通过打包为jar提交flink任务 报错: Cannot have more than one execute() or 
executeAsync() call in a single environment
 * idea执行没有问题
 * 参考: https://blog.csdn.net/tianlangstudio/article/details/123086300
 */
object TTLDemoV2 {
  def main(args: Array[String]): Unit = {
val conf = new Configuration
conf.setInteger(RestOptions.PORT, 28080)

// 从指定的checkpoint启动 不配置则无状态启动
//
conf.setString("execution.savepoint.path","file:///Users/thomas990p/checkpointdir/41f8644ad4492002188d8d4e1009/chk-136")

//  不用local 使用下面的方式 就可以使用多个 execute sql 且都可以生效
val env = StreamExecutionEnvironment.getExecutionEnvironment

// 本地任务停止时,保留 checkpoint 数据

env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)

env.setParallelism(1)

env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE)
env.setStateBackend(new 
FsStateBackend("file:///Users/thomas990p/checkpointdir"))
env.disableOperatorChaining() // 禁用全局任务链

val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)

// 指定国内时区
tEnv.getConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai"))


tEnv.executeSql(
  """
|CREATE TABLE s1 (
|  id String
|  ,name string
|) WITH (
|  'connector' = 'kafka',
|  'topic' = 's1',
|  'scan.startup.mode' = 'latest-offset',
|  'properties.group.id' = 'g1',
|  'properties.bootstrap.servers' = 'localhost:9092',
|  'format' = 'csv'
|)
|""".stripMargin)

/*
不允许配置: 'scan.startup.mode' = 'latest-offset' 否则报错:
Unsupported options found for 'upsert-kafka'.  他必须从earliest消费 无法修改
*/
tEnv.executeSql(
  """
|CREATE TABLE s2 (
|  id String
|  ,age int
|  ,primary key(id) not enforced
|) WITH (
|  'connector' = 'upsert-kafka',
|  'topic' = 's3',
|  'properties.bootstrap.servers' = 'localhost:9092',
|  'key.format' = 'csv'
|  ,'value.format' = 'csv'
|)
|""".stripMargin)

tEnv.executeSql(
  """
|CREATE TABLE s1_sink1 (
|  id String
|  ,name string
|) WITH (
|  'connector' = 'print'
|  ,'print-identifier'='s1 sink'
|)
|""".stripMargin)
tEnv.executeSql(
  """
|CREATE TABLE s2_sink1 (
|  id String
|  ,age int
|) WITH (
|  'connector' = 'print'
|  ,'print-identifier'='s2 sink'
|)
|""".stripMargin)

tEnv.executeSql(
  """
|CREATE TABLE sink1 (
|  id String
|  ,name string
|  ,age int
|) WITH (
|  'connector' = 'print'
|  ,'print-identifier'=''
|)
|""".stripMargin)


tEnv.executeSql("insert into s1_sink1 select * from s1")
tEnv.executeSql("insert into s2_sink1 select * from s2")
tEnv.executeSql("EXECUTE PLAN 
'file:///Users/thomas990p/flink-plain/plan1.json' ") // two 0ms keep left and 
right state forever 



  }

}
 {code}

> doc diff from code
> --
>
> Key: FLINK-34001
> URL: https://issues.apache.org/jira/browse/FLINK-34001
> Project: Flink
>  Issue Type: Bug
>  Components: API / State Processor
>Affects Versions: 1.18.0
>Reporter: yong yang
>Priority: Major
>
> doc:
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/concepts/overview/#idle-state-retention-time
> The current TTL value for both left and right side is {{{}"0 ms"{}}}, which 
> means the state retention is not enabled. 
>  
> but i test find :
> The current TTL value for both left and right side is {{{}"0 ms"{}}}, which 
> means the state is permanence keep!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34001) doc diff from code

2024-01-05 Thread yong yang (Jira)
yong yang created FLINK-34001:
-

 Summary: doc diff from code
 Key: FLINK-34001
 URL: https://issues.apache.org/jira/browse/FLINK-34001
 Project: Flink
  Issue Type: Bug
  Components: API / State Processor
Affects Versions: 1.18.0
Reporter: yong yang


doc:

https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/concepts/overview/#idle-state-retention-time

The current TTL value for both left and right side is {{{}"0 ms"{}}}, which 
means the state retention is not enabled. 

 

but i test find :

The current TTL value for both left and right side is {{{}"0 ms"{}}}, which 
means the state is permanence keep!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-33993) Ineffective scaling detection events are misleading

2024-01-05 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels closed FLINK-33993.
--
Resolution: Fixed

> Ineffective scaling detection events are misleading
> ---
>
> Key: FLINK-33993
> URL: https://issues.apache.org/jira/browse/FLINK-33993
> Project: Flink
>  Issue Type: Bug
>  Components: Autoscaler, Kubernetes Operator
>Affects Versions: kubernetes-operator-1.7.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Minor
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> When the ineffective scaling decision feature is turned off, events are 
> regenerated which look like this:
> {noformat}
> Skipping further scale up after ineffective previous scale up for 
> 65c763af14a952c064c400d516c25529
> {noformat}
> This is misleading because no action will be taken. It is fair to inform 
> users about ineffective scale up even when the feature is disabled but a 
> different message should be printed to convey that no action will be taken.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33993] Fix misleading events for scaling effectiveness detection [flink-kubernetes-operator]

2024-01-05 Thread via GitHub


mxm commented on PR #748:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/748#issuecomment-1878408347

   Thanks for the quick review Rui!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-33856) Add metrics to monitor the interaction performance between task and external storage system in the process of checkpoint making

2024-01-05 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803478#comment-17803478
 ] 

Piotr Nowojski edited comment on FLINK-33856 at 1/5/24 9:58 AM:


{quote}
Maybe a new flip that supports task-level trace reporter can builded ?  I’m 
willing to participate in the development.
{quote}
Please again check the FLIP-384 discussions. I was highlighting there a couple 
of difficulties:
{quote}
However, if we would like to create true distributed traces, with spans
reported from many different
components, potentially both on JM and TM, the problem is a bit deeper. The
issue in that case is how
to actually fill out `parrent_id` and `trace_id`? Passing some context
entity as a java object would be
unfeasible. That would require too many changes in too many places. I think
the only realistic way
to do it, would be to have a deterministic generator of `parten_id` and
`trace_id` values.

For example we could create the parent trace/span of the checkpoint on JM,
and set those ids to
something like: `jobId#attemptId#checkpointId`. Each subtask then could
re-generate those ids
and subtasks' checkpoint span would have an id of
`jobId#attemptId#checkpointId#subTaskId`.
Note that this is just an example, as most likely distributed spans for
checkpointing do not make
sense, as we can generate them much easier on the JM anyway.
{quote}
https://lists.apache.org/thread/7lql5f5q1np68fw1wc9trq3d9l2ox8f4

At the same time:
{quote}
 I am worried that a large amount of data aggregation to JM may have 
performance problems.
{quote}
I wouldn't worry about that too much. This data is already aggregated on the JM 
from all of the TMs via {{CheckpointMetricsBuilder}} and {{CheckpointMetrics}}. 
Besides, it's just a single RPC from subtask -> JM per checkpoint. If that 
becomes a problem, we would have problems in many different areas as well (for 
example {{notifyCheckpointCompleted}} is a very similar call but the other 
direction).

Also AFAIR there are/were different ideas how to solve this potential 
bottleneck in a more generic way (having multiple job coordinators in the 
cluster to spread the load).

[~hejufang001] I would suggest that both of yo chat offline about the scope of 
the changes in [~fanrui]'s FLIP and/or eventual division of work. I'm not sure 
if [~fanrui] plans to add per task/subtask spans for checkpoints and/or 
recovery.


was (Author: pnowojski):
{quote}
Maybe a new flip that supports task-level trace reporter can builded ?  I’m 
willing to participate in the development.
{quote}
Please again check the FLIP-384 discussions. I was highlighting there a couple 
of difficulties:
{quote}
However, if we would like to create true distributed traces, with spans
reported from many different
components, potentially both on JM and TM, the problem is a bit deeper. The
issue in that case is how
to actually fill out `parrent_id` and `trace_id`? Passing some context
entity as a java object would be
unfeasible. That would require too many changes in too many places. I think
the only realistic way
to do it, would be to have a deterministic generator of `parten_id` and
`trace_id` values.

For example we could create the parent trace/span of the checkpoint on JM,
and set those ids to
something like: `jobId#attemptId#checkpointId`. Each subtask then could
re-generate those ids
and subtasks' checkpoint span would have an id of
`jobId#attemptId#checkpointId#subTaskId`.
Note that this is just an example, as most likely distributed spans for
checkpointing do not make
sense, as we can generate them much easier on the JM anyway.
{quote}
https://lists.apache.org/thread/7lql5f5q1np68fw1wc9trq3d9l2ox8f4

At the same time:
{quote}
 I am worried that a large amount of data aggregation to JM may have 
performance problems.
{quote}
I wouldn't worry about that too much. This data is already aggregated on the JM 
from all of the TMs via {{CheckpointMetricsBuilder}} and {{CheckpointMetrics}}. 
Besides, it's just a single RPC from subtask -> JM per checkpoint. If that 
becomes a problem, we would have problems in many different areas as well (for 
example {{notifyCheckpointCompleted}} is a very similar call but the other 
direction).

Also AFAIR there are/were different ideas how to solve this potential 
bottleneck in a more generic way (having multiple job coordinators in the 
cluster to spread the load).



> Add metrics to monitor the interaction performance between task and external 
> storage system in the process of checkpoint making
> ---
>
> Key: FLINK-33856
> URL: https://issues.apache.org/jira/browse/FLINK-33856
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 

Re: [PR] [FLINK-33993] Fix misleading events for scaling effectiveness detection [flink-kubernetes-operator]

2024-01-05 Thread via GitHub


mxm merged PR #748:
URL: https://github.com/apache/flink-kubernetes-operator/pull/748


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-33856) Add metrics to monitor the interaction performance between task and external storage system in the process of checkpoint making

2024-01-05 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803478#comment-17803478
 ] 

Piotr Nowojski edited comment on FLINK-33856 at 1/5/24 9:56 AM:


{quote}
Maybe a new flip that supports task-level trace reporter can builded ?  I’m 
willing to participate in the development.
{quote}
Please again check the FLIP-384 discussions. I was highlighting there a couple 
of difficulties:
{quote}
However, if we would like to create true distributed traces, with spans
reported from many different
components, potentially both on JM and TM, the problem is a bit deeper. The
issue in that case is how
to actually fill out `parrent_id` and `trace_id`? Passing some context
entity as a java object would be
unfeasible. That would require too many changes in too many places. I think
the only realistic way
to do it, would be to have a deterministic generator of `parten_id` and
`trace_id` values.

For example we could create the parent trace/span of the checkpoint on JM,
and set those ids to
something like: `jobId#attemptId#checkpointId`. Each subtask then could
re-generate those ids
and subtasks' checkpoint span would have an id of
`jobId#attemptId#checkpointId#subTaskId`.
Note that this is just an example, as most likely distributed spans for
checkpointing do not make
sense, as we can generate them much easier on the JM anyway.
{quote}
https://lists.apache.org/thread/7lql5f5q1np68fw1wc9trq3d9l2ox8f4

At the same time:
{quote}
 I am worried that a large amount of data aggregation to JM may have 
performance problems.
{quote}
I wouldn't worry about that too much. This data is already aggregated on the JM 
from all of the TMs via {{CheckpointMetricsBuilder}} and {{CheckpointMetrics}}. 
Besides, it's just a single RPC from subtask -> JM per checkpoint. If that 
becomes a problem, we would have problems in many different areas as well (for 
example {{notifyCheckpointCompleted}} is a very similar call but the other 
direction).

Also AFAIR there are/were different ideas how to solve this potential 
bottleneck in a more generic way (having multiple job coordinators in the 
cluster to spread the load).




was (Author: pnowojski):
{quote}
Maybe a new flip that supports task-level trace reporter can builded ?  I’m 
willing to participate in the development.
{quote}
Please again check the FLIP-384 discussions. I was highlighting there a couple 
of difficulties:
{quote}
However, if we would like to create true distributed traces, with spans
reported from many different
components, potentially both on JM and TM, the problem is a bit deeper. The
issue in that case is how
to actually fill out `parrent_id` and `trace_id`? Passing some context
entity as a java object would be
unfeasible. That would require too many changes in too many places. I think
the only realistic way
to do it, would be to have a deterministic generator of `parten_id` and
`trace_id` values.

For example we could create the parent trace/span of the checkpoint on JM,
and set those ids to
something like: `jobId#attemptId#checkpointId`. Each subtask then could
re-generate those ids
and subtasks' checkpoint span would have an id of
`jobId#attemptId#checkpointId#subTaskId`.
Note that this is just an example, as most likely distributed spans for
checkpointing do not make
sense, as we can generate them much easier on the JM anyway.
{quote}
https://lists.apache.org/thread/7lql5f5q1np68fw1wc9trq3d9l2ox8f4

At the same time:
{quote}
 I am worried that a large amount of data aggregation to JM may have 
performance problems.
{quote}
I wouldn't worry about that too much. Those data are already aggregated on the 
JM from all of the TMs via {{CheckpointMetricsBuilder}} and 
{{CheckpointMetrics}}. Besides, it's just a single RPC from subtask -> JM per 
checkpoint. If that becomes a problem, we would have problems in many different 
areas as well (for example {{notifyCheckpointCompleted}} is a very similar call 
but the other direction).

Also AFAIR there are/were different ideas how to solve this potential 
bottleneck in a more generic way (having multiple job coordinators in the 
cluster to spread the load).



> Add metrics to monitor the interaction performance between task and external 
> storage system in the process of checkpoint making
> ---
>
> Key: FLINK-33856
> URL: https://issues.apache.org/jira/browse/FLINK-33856
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.18.0
>Reporter: Jufang He
>Assignee: Jufang He
>Priority: Major
>  Labels: pull-request-available
>
> When Flink makes a checkpoint, the interaction performance with the external 
> file system 

[jira] [Commented] (FLINK-33856) Add metrics to monitor the interaction performance between task and external storage system in the process of checkpoint making

2024-01-05 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803478#comment-17803478
 ] 

Piotr Nowojski commented on FLINK-33856:


{quote}
Maybe a new flip that supports task-level trace reporter can builded ?  I’m 
willing to participate in the development.
{quote}
Please again check the FLIP-384 discussions. I was highlighting there a couple 
of difficulties:
{quote}
However, if we would like to create true distributed traces, with spans
reported from many different
components, potentially both on JM and TM, the problem is a bit deeper. The
issue in that case is how
to actually fill out `parrent_id` and `trace_id`? Passing some context
entity as a java object would be
unfeasible. That would require too many changes in too many places. I think
the only realistic way
to do it, would be to have a deterministic generator of `parten_id` and
`trace_id` values.

For example we could create the parent trace/span of the checkpoint on JM,
and set those ids to
something like: `jobId#attemptId#checkpointId`. Each subtask then could
re-generate those ids
and subtasks' checkpoint span would have an id of
`jobId#attemptId#checkpointId#subTaskId`.
Note that this is just an example, as most likely distributed spans for
checkpointing do not make
sense, as we can generate them much easier on the JM anyway.
{quote}
https://lists.apache.org/thread/7lql5f5q1np68fw1wc9trq3d9l2ox8f4

At the same time:
{quote}
 I am worried that a large amount of data aggregation to JM may have 
performance problems.
{quote}
I wouldn't worry about that too much. Those data are already aggregated on the 
JM from all of the TMs via {{CheckpointMetricsBuilder}} and 
{{CheckpointMetrics}}. Besides, it's just a single RPC from subtask -> JM per 
checkpoint. If that becomes a problem, we would have problems in many different 
areas as well (for example {{notifyCheckpointCompleted}} is a very similar call 
but the other direction).

Also AFAIR there are/were different ideas how to solve this potential 
bottleneck in a more generic way (having multiple job coordinators in the 
cluster to spread the load).



> Add metrics to monitor the interaction performance between task and external 
> storage system in the process of checkpoint making
> ---
>
> Key: FLINK-33856
> URL: https://issues.apache.org/jira/browse/FLINK-33856
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.18.0
>Reporter: Jufang He
>Assignee: Jufang He
>Priority: Major
>  Labels: pull-request-available
>
> When Flink makes a checkpoint, the interaction performance with the external 
> file system has a great impact on the overall time-consuming. Therefore, it 
> is easy to observe the bottleneck point by adding performance indicators when 
> the task interacts with the external file storage system. These include: the 
> rate of file write , the latency to write the file, the latency to close the 
> file.
> In flink side add the above metrics has the following advantages: convenient 
> statistical different task E2E time-consuming; do not need to distinguish the 
> type of external storage system, can be unified in the 
> FsCheckpointStreamFactory.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33775] Report JobInitialization traces [flink]

2024-01-05 Thread via GitHub


StefanRRichter commented on code in PR #23908:
URL: https://github.com/apache/flink/pull/23908#discussion_r1442679306


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java:
##
@@ -730,57 +739,87 @@ void restoreInternal() throws Exception {
 
getEnvironment().getMetricGroup().getIOMetricGroup().markTaskInitializationStarted();
 LOG.debug("Initializing {}.", getName());
 
-operatorChain =
-
getEnvironment().getTaskStateManager().isTaskDeployedAsFinished()
-? new FinishedOperatorChain<>(this, recordWriter)
-: new RegularOperatorChain<>(this, recordWriter);
-mainOperator = operatorChain.getMainOperator();
+SubTaskInitializationMetricsBuilder initializationMetrics =
+new SubTaskInitializationMetricsBuilder(
+SystemClock.getInstance().absoluteTimeMillis());
+try {
+operatorChain =
+
getEnvironment().getTaskStateManager().isTaskDeployedAsFinished()
+? new FinishedOperatorChain<>(this, recordWriter)
+: new RegularOperatorChain<>(this, recordWriter);
+mainOperator = operatorChain.getMainOperator();
 
-getEnvironment()
-.getTaskStateManager()
-.getRestoreCheckpointId()
-.ifPresent(restoreId -> latestReportCheckpointId = restoreId);
+getEnvironment()

Review Comment:
   Argh, yes sorry reviewing in GH sometimes stinks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33980][core] Reorganize job configuration [flink]

2024-01-05 Thread via GitHub


JunRuiLee commented on PR #24025:
URL: https://github.com/apache/flink/pull/24025#issuecomment-1878390963

   Thanks @1996fanrui for the quick reviews, I've updated this pr, PTAL~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33775] Report JobInitialization traces [flink]

2024-01-05 Thread via GitHub


StefanRRichter commented on code in PR #23908:
URL: https://github.com/apache/flink/pull/23908#discussion_r1442676217


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java:
##
@@ -155,7 +167,12 @@ public CheckpointStatsSnapshot createSnapshot() {
 counts.createSnapshot(),
 summary.createSnapshot(),
 history.createSnapshot(),
-latestRestoredCheckpoint);
+jobInitializationMetricsBuilder
+.map(
+JobInitializationMetricsBuilder
+
::buildRestoredCheckpointStats)
+.orElse(Optional.empty())
+.orElse(null));

Review Comment:
   Ah I see, makes sense. You could probably rewrite it as flatMap().orElse() 
though...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33980][core] Reorganize job configuration [flink]

2024-01-05 Thread via GitHub


JunRuiLee commented on code in PR #24025:
URL: https://github.com/apache/flink/pull/24025#discussion_r1442675488


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java:
##
@@ -453,7 +440,10 @@ public StreamExecutionEnvironment setBufferTimeout(long 
timeoutMillis) {
 throw new IllegalArgumentException("Timeout of buffer must be 
non-negative or -1");
 }
 
-this.bufferTimeout = timeoutMillis;
+if (timeoutMillis == ExecutionOptions.DISABLED_NETWORK_BUFFER_TIMEOUT) 
{
+this.configuration.set(ExecutionOptions.BUFFER_TIMEOUT_ENABLED, 
false);
+}
+this.configuration.set(ExecutionOptions.BUFFER_TIMEOUT, 
Duration.ofMillis(timeoutMillis));
 return this;

Review Comment:
   In fact, the getter/setter API provides a user-friendly configuration 
method, and for me, I have not yet found a very clear and compelling reason for 
removing this API. 
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33905][core] Unify the Provision of Diverse Metadata for Context-like APIs [flink]

2024-01-05 Thread via GitHub


xintongsong commented on code in PR #23905:
URL: https://github.com/apache/flink/pull/23905#discussion_r1442654920


##
pom.xml:
##
@@ -2301,6 +2301,8 @@ under the License.

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#fromData(java.lang.Class,java.lang.Object[])

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#fromData(java.util.Collection)

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#fromData(java.util.Collection,org.apache.flink.api.common.typeinfo.TypeInformation)
+   
org.apache.flink.api.common.functions.RuntimeContext
+   
org.apache.flink.api.connector.source.SourceReaderContext

Review Comment:
   This should not be a hotfix commit. These change does not make sense by 
itself without the subsequent commits.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >