[jira] [Created] (FLINK-34007) Flink Job stuck in suspend state after recovery from failure in HA Mode
Zhenqiu Huang created FLINK-34007: - Summary: Flink Job stuck in suspend state after recovery from failure in HA Mode Key: FLINK-34007 URL: https://issues.apache.org/jira/browse/FLINK-34007 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.18.1, 1.18.2 Reporter: Zhenqiu Huang The observation is that Job manager goes to suspend state with a failed container not able to register itself to resource manager after timeout. JM Log: 2024-01-04 02:58:39,210 INFO org.apache.flink.runtime.jobmaster.JobMasterServiceLeadershipRunner [] - JobMasterServiceLeadershipRunner for job 217cee964b2cfdc3115fb74cac0ec550 was revoked leadership with leader id eda6fee6-ce02-4076-9a99-8c43a92629f7. Stopping current JobMasterServiceProcess. 2024-01-04 02:58:58,347 INFO org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - http://172.16.71.11:8081 lost leadership 2024-01-04 02:58:58,347 INFO org.apache.flink.runtime.resourcemanager.ResourceManagerServiceImpl [] - Resource manager service is revoked leadership with session id eda6fee6-ce02-4076-9a99-8c43a92629f7. 2024-01-04 02:58:58,348 INFO org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunner [] - DefaultDispatcherRunner was revoked the leadership with leader id eda6fee6-ce02-4076-9a99-8c43a92629f7. Stopping the DispatcherLeaderProcess. 2024-01-04 02:58:58,348 INFO org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess [] - Stopping SessionDispatcherLeaderProcess. 2024-01-04 02:58:58,349 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Stopping dispatcher pekko.tcp://flink@172.16.71.11:6123/user/rpc/dispatcher_1. 2024-01-04 02:58:58,349 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Stopping the JobMaster for job 'amp-ade-fitness-clickstream-projection-uat' (217cee964b2cfdc3115fb74cac0ec550). 2024-01-04 02:58:58,349 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Stopping all currently running jobs of dispatcher pekko.tcp://flink@172.16.71.11:6123/user/rpc/dispatcher_1. 2024-01-04 02:58:58,351 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job 217cee964b2cfdc3115fb74cac0ec550 reached terminal state SUSPENDED. 2024-01-04 02:58:58,352 INFO org.apache.flink.runtime.security.token.DefaultDelegationTokenManager [] - Stopping credential renewal 2024-01-04 02:58:58,352 INFO org.apache.flink.runtime.security.token.DefaultDelegationTokenManager [] - Stopped credential renewal 2024-01-04 02:58:58,352 INFO org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager [] - Closing the slot manager. 2024-01-04 02:58:58,351 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job amp-ade-fitness-clickstream-projection-uat (217cee964b2cfdc3115fb74cac0ec550) switched from state RUNNING to SUSPENDED. org.apache.flink.util.FlinkException: AdaptiveScheduler is being stopped. at org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.closeAsync(AdaptiveScheduler.java:474) ~[flink-dist-1.18.1.6-ase.jar:1.18.1.6-ase] at org.apache.flink.runtime.jobmaster.JobMaster.stopScheduling(JobMaster.java:1093) ~[flink-dist-1.18.1.6-ase.jar:1.18.1.6-ase] at org.apache.flink.runtime.jobmaster.JobMaster.stopJobExecution(JobMaster.java:1056) ~[flink-dist-1.18.1.6-ase.jar:1.18.1.6-ase] at org.apache.flink.runtime.jobmaster.JobMaster.onStop(JobMaster.java:454) ~[flink-dist-1.18.1.6-ase.jar:1.18.1.6-ase] at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:239) ~[flink-dist-1.18.1.6-ase.jar:1.18.1.6-ase] at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor$StartedState.lambda$terminate$0(PekkoRpcActor.java:574) ~[flink-rpc-akkadb952114-fa83-4aba-b20a-b7e5771ce59c.jar:1.18.1.6-ase] at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) ~[flink-dist-1.18.1.6-ase.jar:1.18.1.6-ase] at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor$StartedState.terminate(PekkoRpcActor.java:573) ~[flink-rpc-akkadb952114-fa83-4aba-b20a-b7e5771ce59c.jar:1.18.1.6-ase] at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleControlMessage(PekkoRpcActor.java:196) ~[flink-rpc-akkadb952114-fa83-4aba-b20a-b7e5771ce59c.jar:1.18.1.6-ase] at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) [flink-rpc-akkadb952114-fa83-4aba-b20a-b7e5771ce59c.jar:1.18.1.6-ase] at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) [flink-rpc-akkadb952114-fa83-4aba-b20a-b7e5771ce59c.jar:1.18.1.6-ase] at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) [flink-rpc-akkadb952114-fa83-4aba-b20a-b7e5771ce59c.jar:1.18.1.6-ase]
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
snuyanzin commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1443547398 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/ParameterizedPredicate.java: ## @@ -52,8 +69,26 @@ public void setPredicate(String predicate) { } public ParameterizedPredicate combine(String operator, ParameterizedPredicate that) { +int paramIndex = String.format("(%s %s ", this.predicate, operator).length(); +if (!that.indexesOfPredicatePlaceHolders.isEmpty()) { +paramIndex = paramIndex + that.indexesOfPredicatePlaceHolders.get(0); +} + this.predicate = String.format("(%s %s %s)", this.predicate, operator, that.predicate); this.parameters = ArrayUtils.addAll(this.parameters, that.parameters); + +for (int i = 0; i < this.indexesOfPredicatePlaceHolders.size(); i++) { +// increment all the existing indexes to account for the new additional first begin +// bracket +this.indexesOfPredicatePlaceHolders.set( +i, this.indexesOfPredicatePlaceHolders.get(i) + 1); +} +if (that.predicate.equals( + JdbcFilterPushdownPreparedStatementVisitor.PUSHDOWN_PREDICATE_PLACEHOLDER) +|| (!that.indexesOfPredicatePlaceHolders.isEmpty())) { +// add index if that is a placeholder or has a placeholder. +this.indexesOfPredicatePlaceHolders.add(paramIndex); +} Review Comment: Also I tend to think that approach with indexes will not work because between finding indexes and building `resolvedConditions` there is an optimizer which could change SQL a bit example of query where such change happens ```sql "SELECT * FROM a left join d FOR SYSTEM_TIME AS OF a.proctime on ((d.age = 50 AND d.type = 0) OR (d.type = 1 AND d.age =40)) and a.ip = d.ip" ``` ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/ParameterizedPredicate.java: ## @@ -52,8 +69,26 @@ public void setPredicate(String predicate) { } public ParameterizedPredicate combine(String operator, ParameterizedPredicate that) { +int paramIndex = String.format("(%s %s ", this.predicate, operator).length(); +if (!that.indexesOfPredicatePlaceHolders.isEmpty()) { +paramIndex = paramIndex + that.indexesOfPredicatePlaceHolders.get(0); +} + this.predicate = String.format("(%s %s %s)", this.predicate, operator, that.predicate); this.parameters = ArrayUtils.addAll(this.parameters, that.parameters); + +for (int i = 0; i < this.indexesOfPredicatePlaceHolders.size(); i++) { +// increment all the existing indexes to account for the new additional first begin +// bracket +this.indexesOfPredicatePlaceHolders.set( +i, this.indexesOfPredicatePlaceHolders.get(i) + 1); +} +if (that.predicate.equals( + JdbcFilterPushdownPreparedStatementVisitor.PUSHDOWN_PREDICATE_PLACEHOLDER) +|| (!that.indexesOfPredicatePlaceHolders.isEmpty())) { +// add index if that is a placeholder or has a placeholder. +this.indexesOfPredicatePlaceHolders.add(paramIndex); +} Review Comment: Also I tend to think that approach with indexes will not work because between finding indexes and building `resolvedConditions` there is an optimizer which could change SQL a bit example of query where such change happens ```sql SELECT * FROM a left join d FOR SYSTEM_TIME AS OF a.proctime on ((d.age = 50 AND d.type = 0) OR (d.type = 1 AND d.age =40)) and a.ip = d.ip; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
snuyanzin commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1443523454 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/ParameterizedPredicate.java: ## @@ -52,8 +69,26 @@ public void setPredicate(String predicate) { } public ParameterizedPredicate combine(String operator, ParameterizedPredicate that) { +int paramIndex = String.format("(%s %s ", this.predicate, operator).length(); +if (!that.indexesOfPredicatePlaceHolders.isEmpty()) { +paramIndex = paramIndex + that.indexesOfPredicatePlaceHolders.get(0); +} + this.predicate = String.format("(%s %s %s)", this.predicate, operator, that.predicate); this.parameters = ArrayUtils.addAll(this.parameters, that.parameters); + +for (int i = 0; i < this.indexesOfPredicatePlaceHolders.size(); i++) { +// increment all the existing indexes to account for the new additional first begin +// bracket +this.indexesOfPredicatePlaceHolders.set( +i, this.indexesOfPredicatePlaceHolders.get(i) + 1); +} +if (that.predicate.equals( + JdbcFilterPushdownPreparedStatementVisitor.PUSHDOWN_PREDICATE_PLACEHOLDER) +|| (!that.indexesOfPredicatePlaceHolders.isEmpty())) { +// add index if that is a placeholder or has a placeholder. +this.indexesOfPredicatePlaceHolders.add(paramIndex); +} Review Comment: This will not work for cases when constant on the left side e.g. ```sql @Test public void test123() { util.verifyExecPlan( "SELECT * FROM a left join d FOR SYSTEM_TIME AS OF a.proctime on (50 = d.age OR 1 = d.type) and a.ip = d.ip"); } ``` this test will fail with ``` Caused by: java.lang.IndexOutOfBoundsException: Index: 0, Size: 0 at java.util.ArrayList.rangeCheck(ArrayList.java:659) at java.util.ArrayList.get(ArrayList.java:435) at org.apache.flink.connector.jdbc.table.JdbcDynamicTableSource.getResolvedConditions(JdbcDynamicTableSource.java:157) at org.apache.flink.connector.jdbc.table.JdbcDynamicTableSource.getLookupRuntimeProvider(JdbcDynamicTableSource.java:120) at org.apache.flink.table.planner.plan.utils.LookupJoinUtil.createLookupRuntimeProvider(LookupJoinUtil.java:626) at org.apache.flink.table.planner.plan.utils.LookupJoinUtil.isAsyncLookup(LookupJoinUtil.java:413) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33817][flink-protobuf] Set ReadDefaultValues=False by default in proto3 for performance improvement [flink]
flinkbot commented on PR #24035: URL: https://github.com/apache/flink/pull/24035#issuecomment-1879438867 ## CI report: * a0672a71f647609a9a4790943070cc56f7da1d10 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-33817) Allow ReadDefaultValues = False for non primitive types on Proto3
[ https://issues.apache.org/jira/browse/FLINK-33817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-33817: --- Labels: pull-request-available (was: ) > Allow ReadDefaultValues = False for non primitive types on Proto3 > - > > Key: FLINK-33817 > URL: https://issues.apache.org/jira/browse/FLINK-33817 > Project: Flink > Issue Type: Improvement > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.18.0 >Reporter: Sai Sharath Dandi >Priority: Major > Labels: pull-request-available > > *Background* > > The current Protobuf format > [implementation|https://github.com/apache/flink/blob/c3e2d163a637dca5f49522721109161bd7ebb723/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/ProtoToRowConverter.java] > always sets ReadDefaultValues=False when using Proto3 version. This can > cause severe performance degradation for large Protobuf schemas with OneOf > fields as the entire generated code needs to be executed during > deserialization even when certain fields are not present in the data to be > deserialized and all the subsequent nested Fields can be skipped. Proto3 > supports hasXXX() methods for checking field presence for non primitive types > since Proto version > [3.15|https://github.com/protocolbuffers/protobuf/releases/tag/v3.15.0]. In > the internal performance benchmarks in our company, we've seen almost 10x > difference in performance for one of our real production usecase when > allowing to set ReadDefaultValues=False with proto3 version. The exact > difference in performance depends on the schema complexity and data payload > but we should allow user to set readDefaultValue=False in general. > > *Solution* > > Support using ReadDefaultValues=False when using Proto3 version. We need to > be careful to check for field presence only on non-primitive types if > ReadDefaultValues is false and version used is Proto3 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-33817][flink-protobuf] Set ReadDefaultValues=False by default in proto3 for performance improvement [flink]
sharath1709 opened a new pull request, #24035: URL: https://github.com/apache/flink/pull/24035 ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
snuyanzin commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1443527603 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java: ## @@ -94,30 +97,76 @@ public JdbcDynamicTableSource( @Override public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) { -// JDBC only support non-nested look up keys +// JDBC only supports non-nested look up keys String[] keyNames = new String[context.getKeys().length]; + for (int i = 0; i < keyNames.length; i++) { int[] innerKeyArr = context.getKeys()[i]; Preconditions.checkArgument( innerKeyArr.length == 1, "JDBC only support non-nested look up keys"); keyNames[i] = DataType.getFieldNames(physicalRowDataType).get(innerKeyArr[0]); } + final RowType rowType = (RowType) physicalRowDataType.getLogicalType(); + JdbcRowDataLookupFunction lookupFunction = new JdbcRowDataLookupFunction( options, lookupMaxRetryTimes, DataType.getFieldNames(physicalRowDataType).toArray(new String[0]), DataType.getFieldDataTypes(physicalRowDataType).toArray(new DataType[0]), keyNames, -rowType); +rowType, + getResolvedConditions(this.pushdownParameterizedPredicates)); if (cache != null) { return PartialCachingLookupProvider.of(lookupFunction, cache); } else { return LookupFunctionProvider.of(lookupFunction); } } +@VisibleForTesting +protected String[] getResolvedConditions(List parameterizedPredicates) { +String[] conditions = null; +if (parameterizedPredicates != null) { +conditions = new String[parameterizedPredicates.size()]; + +for (int predicateIndex = 0; +predicateIndex < parameterizedPredicates.size(); +predicateIndex++) { +ParameterizedPredicate pushdownParameterizedPredicate = +parameterizedPredicates.get(predicateIndex); +String predicate = pushdownParameterizedPredicate.getPredicate(); +Serializable[] parameters = pushdownParameterizedPredicate.getParameters(); +ArrayList indexesOfPredicatePlaceHolders = + pushdownParameterizedPredicate.getIndexesOfPredicatePlaceHolders(); +// build up the resolved condition. +StringBuilder resolvedCondition = new StringBuilder(); +if (parameters == null || parameters.length == 0) { +// no parameter values to resolve (for example when there is a unary +// operation) +resolvedCondition.append(predicate); +} else { +int processingIndex = 0; +for (int parameterIndex = 0; +parameterIndex < parameters.length; +parameterIndex++) { +resolvedCondition.append( +predicate.substring( +processingIndex, + indexesOfPredicatePlaceHolders.get(parameterIndex))); Review Comment: no need to do substring since `append` could append only a part of string based on provided boundaries e.g. ```java resolvedCondition.append( predicate, processingIndex, indexesOfPredicatePlaceHolders.get(parameterIndex)); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
snuyanzin commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1443526792 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java: ## @@ -94,30 +97,76 @@ public JdbcDynamicTableSource( @Override public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) { -// JDBC only support non-nested look up keys +// JDBC only supports non-nested look up keys String[] keyNames = new String[context.getKeys().length]; + for (int i = 0; i < keyNames.length; i++) { int[] innerKeyArr = context.getKeys()[i]; Preconditions.checkArgument( innerKeyArr.length == 1, "JDBC only support non-nested look up keys"); keyNames[i] = DataType.getFieldNames(physicalRowDataType).get(innerKeyArr[0]); } + final RowType rowType = (RowType) physicalRowDataType.getLogicalType(); + JdbcRowDataLookupFunction lookupFunction = new JdbcRowDataLookupFunction( options, lookupMaxRetryTimes, DataType.getFieldNames(physicalRowDataType).toArray(new String[0]), DataType.getFieldDataTypes(physicalRowDataType).toArray(new DataType[0]), keyNames, -rowType); +rowType, + getResolvedConditions(this.pushdownParameterizedPredicates)); if (cache != null) { return PartialCachingLookupProvider.of(lookupFunction, cache); } else { return LookupFunctionProvider.of(lookupFunction); } } +@VisibleForTesting +protected String[] getResolvedConditions(List parameterizedPredicates) { +String[] conditions = null; +if (parameterizedPredicates != null) { +conditions = new String[parameterizedPredicates.size()]; + +for (int predicateIndex = 0; +predicateIndex < parameterizedPredicates.size(); +predicateIndex++) { +ParameterizedPredicate pushdownParameterizedPredicate = +parameterizedPredicates.get(predicateIndex); +String predicate = pushdownParameterizedPredicate.getPredicate(); +Serializable[] parameters = pushdownParameterizedPredicate.getParameters(); +ArrayList indexesOfPredicatePlaceHolders = + pushdownParameterizedPredicate.getIndexesOfPredicatePlaceHolders(); +// build up the resolved condition. +StringBuilder resolvedCondition = new StringBuilder(); +if (parameters == null || parameters.length == 0) { +// no parameter values to resolve (for example when there is a unary +// operation) +resolvedCondition.append(predicate); +} else { +int processingIndex = 0; +for (int parameterIndex = 0; +parameterIndex < parameters.length; +parameterIndex++) { +resolvedCondition.append( +predicate.substring( +processingIndex, + indexesOfPredicatePlaceHolders.get(parameterIndex))); +resolvedCondition.append(parameters[parameterIndex]); +processingIndex = indexesOfPredicatePlaceHolders.get(parameterIndex) + 1; +} +resolvedCondition.append( +predicate.substring(processingIndex, predicate.length())); Review Comment: ```suggestion resolvedCondition.append( predicate.substring(processingIndex)); ``` no need to specify last index if it is end of string -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
snuyanzin commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1443521093 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitor.java: ## @@ -97,12 +147,14 @@ private Optional renderBinaryOperator( Optional leftOperandString = allOperands.get(0).accept(this); Optional rightOperandString = allOperands.get(1).accept(this); - -return leftOperandString.flatMap( -left -> rightOperandString.map(right -> left.combine(operator, right))); +Optional renderedParameterizedPredicate = +leftOperandString.flatMap( +left -> rightOperandString.map(right -> left.combine(operator, right))); +return renderedParameterizedPredicate; } -private Optional renderUnaryOperator( +@VisibleForTesting +protected Optional renderUnaryOperator( Review Comment: Why is it marked as `VisibleForTesting` if it is not used in any test? ## flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactoryTest.java: ## @@ -390,7 +390,7 @@ void testJdbcLookupPropertiesWithExcludeEmptyResult() { assertThat(actual).isEqualTo(expected); } -private Map getAllOptions() { +protected static Map getAllOptions() { Review Comment: why do we need this? ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/ParameterizedPredicate.java: ## @@ -23,18 +23,35 @@ import org.apache.commons.lang3.ArrayUtils; import java.io.Serializable; +import java.util.ArrayList; /** A data class that model parameterized sql predicate. */ @Experimental public class ParameterizedPredicate { private String predicate; private Serializable[] parameters; +private ArrayList indexesOfPredicatePlaceHolders = new ArrayList<>(); Review Comment: ```suggestion private List indexesOfPredicatePlaceHolders = new ArrayList<>(); ``` It is recommened to use interfaces rather than concrete implementation ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/ParameterizedPredicate.java: ## @@ -52,8 +69,26 @@ public void setPredicate(String predicate) { } public ParameterizedPredicate combine(String operator, ParameterizedPredicate that) { +int paramIndex = String.format("(%s %s ", this.predicate, operator).length(); +if (!that.indexesOfPredicatePlaceHolders.isEmpty()) { +paramIndex = paramIndex + that.indexesOfPredicatePlaceHolders.get(0); +} + this.predicate = String.format("(%s %s %s)", this.predicate, operator, that.predicate); this.parameters = ArrayUtils.addAll(this.parameters, that.parameters); + +for (int i = 0; i < this.indexesOfPredicatePlaceHolders.size(); i++) { +// increment all the existing indexes to account for the new additional first begin +// bracket +this.indexesOfPredicatePlaceHolders.set( +i, this.indexesOfPredicatePlaceHolders.get(i) + 1); +} +if (that.predicate.equals( + JdbcFilterPushdownPreparedStatementVisitor.PUSHDOWN_PREDICATE_PLACEHOLDER) +|| (!that.indexesOfPredicatePlaceHolders.isEmpty())) { +// add index if that is a placeholder or has a placeholder. +this.indexesOfPredicatePlaceHolders.add(paramIndex); +} Review Comment: This will not work for cases when constant on the left side e.g. ```sql @Test public void test123() { util.verifyExecPlan( "SELECT * FROM a left join d FOR SYSTEM_TIME AS OF a.proctime on (50 = d.age OR 1 = d.type) and a.ip = d.ip"); } ``` this test will fail with ``` Caused by: java.lang.IndexOutOfBoundsException: Index: 0, Size: 0 at java.util.ArrayList.rangeCheck(ArrayList.java:659) at java.util.ArrayList.get(ArrayList.java:435) at org.apache.flink.connector.jdbc.table.JdbcDynamicTableSource.getResolvedConditions(JdbcDynamicTableSource.java:157) at org.apache.flink.connector.jdbc.table.JdbcDynamicTableSource.getLookupRuntimeProvider(JdbcDynamicTableSource.java:120) at org.apache.flink.table.planner.plan.utils.LookupJoinUtil.createLookupRuntimeProvider(LookupJoinUtil.java:626) at org.apache.flink.table.planner.plan.utils.LookupJoinUtil.isAsyncLookup(LookupJoinUtil.java:413) ``` The method itself makes some things which could be omitted, I would suggest to replace it with ```suggestion this.predicate = String.format("(%s %s %s)", this.predicate, operator, that.predicate); this.parameters = ArrayUtils.addAll(this.parameters, that.parameters);
[jira] [Created] (FLINK-34006) Flink terminates the execution of an application when there is a network problem between TaskManagers
Sophia created FLINK-34006: -- Summary: Flink terminates the execution of an application when there is a network problem between TaskManagers Key: FLINK-34006 URL: https://issues.apache.org/jira/browse/FLINK-34006 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing, Runtime / Task Affects Versions: 1.17.1 Reporter: Sophia Flink terminates an application when two TaskManager are disconnected although there are enough resources in the cluster to run the application and we use checkpoint restart. We deploy Flink v(1.17.1) on a cluster of six nodes with Ubuntu 18.04, the cluster consists of a JobManager and five TaskManagers. We use Flink's Standalone resource manager. We set the number of slots per TaskManager to one, and submit a WordCount application with a level of parallelism equal to three. We enable Flink checkpointing and restart failover strategy to attempt a restart in case of failure three times before termination and the time between attempts to 10 seconds. The application starts running on the first 3 TaskManager. If the communication is broken between two of the TaskManager that run the application, the job fails, and the JobManager tries to restart the job again. When the job fails the resources on the TaskManager are free. When the JobManager restarts the job, it selects the same three TaskManager it choose in the first attempt, and the job fails again. After three trials, Flink terminates the job with an exception: Connecting to remote task manager has failed. These are the JobManager Configurations: * taskmanager.numberOfTaskSlots: 1 * Enable checkpointing: --checkpointing * execution.checkpointing.interval: 3min * Enabling restart failover strategy * restart-strategy.type: fixed-delay * restart-strategy.fixed-delay.attempts: 3 * restart-strategy.fixed-delay.delay: 10 s command: ./bin/flink run -p 3 examples/streaming/WordCount.jar --checkpointing --input ~/flink/alice.txt -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-31238] [WIP] Use IngestDB to speed up Rocksdb rescaling recovery (rebased) [flink]
rkhachatryan commented on code in PR #24031: URL: https://github.com/apache/flink/pull/24031#discussion_r1443459209 ## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java: ## @@ -105,6 +110,10 @@ public class RocksDBIncrementalRestoreOperation implements RocksDBRestoreOper private boolean isKeySerializerCompatibilityChecked; +private final TriConsumerWithException< +Collection, byte[], byte[], Exception> +rescalingRestoreFromLocalStateOperation; + Review Comment: This can be replaced by a boolean flag (`useIngestDbRestoreMode`) and a "normal" `if` and call in `restoreWithRescaling`. I think it would be more readable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-31238] [WIP] Use IngestDB to speed up Rocksdb rescaling recovery (rebased) [flink]
rkhachatryan commented on code in PR #24031: URL: https://github.com/apache/flink/pull/24031#discussion_r1443422130 ## flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksIncrementalCheckpointRescalingTest.java: ## @@ -419,7 +431,12 @@ private KeyedOneInputStreamOperatorTestHarness getHarne } private StateBackend getStateBackend() throws Exception { -return new RocksDBStateBackend("file://" + rootFolder.newFolder().getAbsolutePath(), true); +RocksDBStateBackend rocksDBStateBackend = +new RocksDBStateBackend("file://" + rootFolder.newFolder().getAbsolutePath(), true); +Configuration configuration = new Configuration(); Review Comment: Missing import for `Configuration`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-31238] [WIP] Use IngestDB to speed up Rocksdb rescaling recovery (rebased) [flink]
rkhachatryan commented on code in PR #24031: URL: https://github.com/apache/flink/pull/24031#discussion_r1443421426 ## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java: ## @@ -154,6 +162,54 @@ private static void deleteRange( } } +/** + * Clip the entries in the CF according to the range [begin_key, end_key). Any entries outside + * this range will be completely deleted (including tombstones). + * + * @param db the target need to be clipped. + * @param columnFamilyHandles the column family need to be clipped. + * @param beginKeyBytes the begin key bytes + * @param endKeyBytes the end key bytes + */ +public static void clipColumnFamilies( +RocksDB db, +List columnFamilyHandles, +byte[] beginKeyBytes, +byte[] endKeyBytes) +throws RocksDBException { + +for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) { +db.clipColumnFamily(columnFamilyHandle, beginKeyBytes, endKeyBytes); +} +} + +public static List> +exportColumnFamilies( +RocksDB db, +List columnFamilyHandles, +List stateMetaInfoSnapshots, Review Comment: These two lists should match each other, right? Should we check with `Preconditions.checkState(columnFamilyHandles.size() == stateMetaInfoSnapshots.size());`, or better pass RocksDB instance here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-31238] [WIP] Use IngestDB to speed up Rocksdb rescaling recovery (rebased) [flink]
rkhachatryan commented on code in PR #24031: URL: https://github.com/apache/flink/pull/24031#discussion_r1443420860 ## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java: ## @@ -305,6 +311,11 @@ private EmbeddedRocksDBStateBackend( overlapFractionThreshold >= 0 && this.overlapFractionThreshold <= 1, "Overlap fraction threshold of restoring should be between 0 and 1"); +useIngestDbRestoreMode = +original.useIngestDbRestoreMode == UNDEFINED_USE_INGEST_DB_RESTORE_MODE +? config.get(USE_INGEST_DB_RESTORE_MODE) +: original.useIngestDbRestoreMode; Review Comment: Won't this ignore `original.useIngestDbRestoreMode` if it's set to `false`? (I guess this is a copy-paste from other fields, but they aren't boolean) ## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java: ## @@ -329,74 +332,164 @@ private void restoreWithRescaling(Collection restoreStateHandl CompositeKeySerializationUtils.serializeKeyGroup( keyGroupRange.getEndKeyGroup() + 1, stopKeyGroupPrefixBytes); -// Insert all remaining state through creating temporary RocksDB instances +rescalingRestoreFromLocalStateOperation.accept( +localKeyedStateHandles, startKeyGroupPrefixBytes, stopKeyGroupPrefixBytes); +} finally { +// Cleanup all download directories +allDownloadSpecs.stream() +.map(StateHandleDownloadSpec::getDownloadDestination) +.forEach(this::cleanUpPathQuietly); +} +} + +private void rescaleCopyFromTemporaryInstance( +Collection localKeyedStateHandles, +byte[] startKeyGroupPrefixBytes, +byte[] stopKeyGroupPrefixBytes) +throws Exception { + +// Choose the best state handle for the initial DB +final IncrementalLocalKeyedStateHandle selectedInitialHandle = + RocksDBIncrementalCheckpointUtils.chooseTheBestStateHandleForInitial( +localKeyedStateHandles, keyGroupRange, overlapFractionThreshold); + +Preconditions.checkNotNull(selectedInitialHandle); + +// Remove the selected handle from the list so that we don't restore it twice. +localKeyedStateHandles.remove(selectedInitialHandle); + +// Init the base DB instance with the initial state +initBaseDBForRescaling(selectedInitialHandle); + +for (IncrementalLocalKeyedStateHandle stateHandle : localKeyedStateHandles) { +logger.info("Starting to restore from state handle: {} with rescaling.", stateHandle); + +try (RestoredDBInstance tmpRestoreDBInfo = +restoreTempDBInstanceFromLocalState(stateHandle); +RocksDBWriteBatchWrapper writeBatchWrapper = +new RocksDBWriteBatchWrapper( +this.rocksHandle.getDb(), writeBatchSize)) { + +List tmpColumnFamilyDescriptors = +tmpRestoreDBInfo.columnFamilyDescriptors; +List tmpColumnFamilyHandles = +tmpRestoreDBInfo.columnFamilyHandles; + +// iterating only the requested descriptors automatically skips the default +// column +// family handle +for (int descIdx = 0; descIdx < tmpColumnFamilyDescriptors.size(); ++descIdx) { +ColumnFamilyHandle tmpColumnFamilyHandle = tmpColumnFamilyHandles.get(descIdx); + +ColumnFamilyHandle targetColumnFamilyHandle = + this.rocksHandle.getOrRegisterStateColumnFamilyHandle( +null, + tmpRestoreDBInfo.stateMetaInfoSnapshots.get(descIdx)) +.columnFamilyHandle; + +try (RocksIteratorWrapper iterator = +RocksDBOperationUtils.getRocksIterator( +tmpRestoreDBInfo.db, +tmpColumnFamilyHandle, +tmpRestoreDBInfo.readOptions)) { + +iterator.seek(startKeyGroupPrefixBytes); + +while (iterator.isValid()) { + +if (RocksDBIncrementalCheckpointUtils.beforeThePrefixBytes( +iterator.key(), stopKeyGroupPrefixBytes)) { +writeBatchWrapper.put( +targetColumnFamilyHandle, iterator.key(), iterator.value()); +
Re: [PR] [FLINK-33996][table-planner]: Avoid merging projects if leads to redundant computation [flink]
jeyhunkarimov commented on PR #24033: URL: https://github.com/apache/flink/pull/24033#issuecomment-1879333784 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33611] [flink-protobuf] Support Large Protobuf Schemas [flink]
sharath1709 commented on PR #23937: URL: https://github.com/apache/flink/pull/23937#issuecomment-1879310386 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33996][table-planner]: Avoid merging projects if leads to redundant computation [flink]
jeyhunkarimov commented on PR #24033: URL: https://github.com/apache/flink/pull/24033#issuecomment-1879308733 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34005] Implement restore tests for MiniBatchAssigner node [flink]
flinkbot commented on PR #24034: URL: https://github.com/apache/flink/pull/24034#issuecomment-1879185905 ## CI report: * a671536a73ceee8b2fb8ec1cef69f342f979cd1e UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-34005) Implement restore tests for MiniBatchAssigner node
[ https://issues.apache.org/jira/browse/FLINK-34005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-34005: --- Labels: pull-request-available (was: ) > Implement restore tests for MiniBatchAssigner node > -- > > Key: FLINK-34005 > URL: https://issues.apache.org/jira/browse/FLINK-34005 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Reporter: Bonnie Varghese >Assignee: Bonnie Varghese >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-34005] Implement restore tests for MiniBatchAssigner node [flink]
bvarghese1 opened a new pull request, #24034: URL: https://github.com/apache/flink/pull/24034 ## What is the purpose of the change *Add restore tests for MiniBatchAssigner node* ## Verifying this change This change added tests and can be verified as follows: - Added restore tests for MiniBatchAssigner node which verifies the generated compiled plan with the saved compiled plan. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-34005) Implement restore tests for MiniBatchAssigner node
Bonnie Varghese created FLINK-34005: --- Summary: Implement restore tests for MiniBatchAssigner node Key: FLINK-34005 URL: https://issues.apache.org/jira/browse/FLINK-34005 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: Bonnie Varghese Assignee: Bonnie Varghese -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [hotfix] Update NOTICE files to reflect year 2023 [flink-connector-hive]
snuyanzin commented on PR #4: URL: https://github.com/apache/flink-connector-hive/pull/4#issuecomment-1879126027 superceded by https://github.com/apache/flink-connector-hive/pull/6 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix] Update NOTICE files to reflect year 2023 [flink-connector-hive]
snuyanzin closed pull request #4: [hotfix] Update NOTICE files to reflect year 2023 URL: https://github.com/apache/flink-connector-hive/pull/4 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34002] [FLINK-33883][jdbc] Bump CI flink version on flink-connector-elasticsearch to support Flink 1.19 [flink-connector-elasticsearch]
snuyanzin commented on PR #86: URL: https://github.com/apache/flink-connector-elasticsearch/pull/86#issuecomment-1879122467 I submitted a PR to your branch to make it passing tests with jdk17+ https://github.com/MartijnVisser/flink-connector-elasticsearch/pull/3 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-33925) Extended failure handling for bulk requests (elasticsearch back port)
[ https://issues.apache.org/jira/browse/FLINK-33925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803629#comment-17803629 ] Peter Schulz edited comment on FLINK-33925 at 1/5/24 5:13 PM: -- [~martijnvisser], this PR seems stuck. I'm not sure how to interpret the failed CI pipeline. It complains about some ITs violating architecture rules: {code} ITCASE tests should use a MiniCluster resource or extension' was violated (3 times): org.apache.flink.connector.opensearch.sink.OpensearchSinkITCase does not satisfy… org.apache.flink.connector.opensearch.sink.OpensearchWriterITCase does not satisfy… org.apache.flink.connector.opensearch.table.OpensearchDynamicSinkITCase does not satisfy… {code} Except for {{OpensearchWriterITCase}} my PR did not touch those files. How can we move this forward? was (Author: schulzp): [~martijnvisser], this PR seems stuck. I'm not sure how to interpret the failed CI pipeline. It complains about some ITs violating architecture rules: {code} ITCASE tests should use a MiniCluster resource or extension' was violated (3 times): org.apache.flink.connector.opensearch.sink.OpensearchSinkITCase does not satisfy… org.apache.flink.connector.opensearch.sink.OpensearchWriterITCase does not satisfy… org.apache.flink.connector.opensearch.table.OpensearchDynamicSinkITCase does not satisfy… {code} Except for `OpensearchWriterITCase` my PR did not touch those files. How can we move this forward? > Extended failure handling for bulk requests (elasticsearch back port) > - > > Key: FLINK-33925 > URL: https://issues.apache.org/jira/browse/FLINK-33925 > Project: Flink > Issue Type: Improvement > Components: Connectors / Opensearch >Affects Versions: opensearch-1.0.1 >Reporter: Peter Schulz >Assignee: Peter Schulz >Priority: Major > Labels: pull-request-available > > This is a back port of the implementation for the elasticsearch connector, > see FLINK-32028, to achieve consistent APIs. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-33925) Extended failure handling for bulk requests (elasticsearch back port)
[ https://issues.apache.org/jira/browse/FLINK-33925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803629#comment-17803629 ] Peter Schulz edited comment on FLINK-33925 at 1/5/24 5:12 PM: -- [~martijnvisser], this PR seems stuck. I'm not sure how to interpret the failed CI pipeline. It complains about some ITs violating architecture rules: {code} ITCASE tests should use a MiniCluster resource or extension' was violated (3 times): org.apache.flink.connector.opensearch.sink.OpensearchSinkITCase does not satisfy… org.apache.flink.connector.opensearch.sink.OpensearchWriterITCase does not satisfy… org.apache.flink.connector.opensearch.table.OpensearchDynamicSinkITCase does not satisfy… {code} Except for `OpensearchWriterITCase` my PR did not touch those files. How can we move this forward? was (Author: schulzp): [~martijnvisser], this PR seems stuck. I'm not sure how to interpret the failed CI pipeline. It complains about some ITs violating architecture rules: ``` ITCASE tests should use a MiniCluster resource or extension' was violated (3 times): org.apache.flink.connector.opensearch.sink.OpensearchSinkITCase does not satisfy… org.apache.flink.connector.opensearch.sink.OpensearchWriterITCase does not satisfy… org.apache.flink.connector.opensearch.table.OpensearchDynamicSinkITCase does not satisfy… ``` Except for `OpensearchWriterITCase` my PR did not touch those files. How can we move this forward? > Extended failure handling for bulk requests (elasticsearch back port) > - > > Key: FLINK-33925 > URL: https://issues.apache.org/jira/browse/FLINK-33925 > Project: Flink > Issue Type: Improvement > Components: Connectors / Opensearch >Affects Versions: opensearch-1.0.1 >Reporter: Peter Schulz >Assignee: Peter Schulz >Priority: Major > Labels: pull-request-available > > This is a back port of the implementation for the elasticsearch connector, > see FLINK-32028, to achieve consistent APIs. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33925) Extended failure handling for bulk requests (elasticsearch back port)
[ https://issues.apache.org/jira/browse/FLINK-33925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803629#comment-17803629 ] Peter Schulz commented on FLINK-33925: -- [~martijnvisser], this PR seems stuck. I'm not sure how to interpret the failed CI pipeline. It complains about some ITs violating architecture rules: ``` ITCASE tests should use a MiniCluster resource or extension' was violated (3 times): org.apache.flink.connector.opensearch.sink.OpensearchSinkITCase does not satisfy… org.apache.flink.connector.opensearch.sink.OpensearchWriterITCase does not satisfy… org.apache.flink.connector.opensearch.table.OpensearchDynamicSinkITCase does not satisfy… ``` Except for `OpensearchWriterITCase` my PR did not touch those files. How can we move this forward? > Extended failure handling for bulk requests (elasticsearch back port) > - > > Key: FLINK-33925 > URL: https://issues.apache.org/jira/browse/FLINK-33925 > Project: Flink > Issue Type: Improvement > Components: Connectors / Opensearch >Affects Versions: opensearch-1.0.1 >Reporter: Peter Schulz >Assignee: Peter Schulz >Priority: Major > Labels: pull-request-available > > This is a back port of the implementation for the elasticsearch connector, > see FLINK-32028, to achieve consistent APIs. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33996) Support disabling project rewrite when multiple exprs in the project reference the same sub project field.
[ https://issues.apache.org/jira/browse/FLINK-33996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803626#comment-17803626 ] Jeyhun Karimov commented on FLINK-33996: Thanks for reporting the issue [~hackergin] and thanks for your comment [~xuyangzhong]. I mostly agree with [~xuyangzhong]. An alternative (or simpler) solution would be to add an extra check in {{FlinkRelUtil::mergeable}} function, since all projection/calc merging rules use the result of this function before merging. [~hackergin] IMO adding a new rule would be tricky since - We will need to ingest the new rule to multiple places in the optimization process (codebase), because 1) project and calc merging rules are used several times and 2) the order of the new optimization rule will matter - If we add a new rule, and if the two project/calc nodes already merged (by the existing merging rules), it will be non-trivial for our new rule to rollback (seperate the merged project/calc nodes) What do you guys think? Also, please feel free to check/review the PR. > Support disabling project rewrite when multiple exprs in the project > reference the same sub project field. > -- > > Key: FLINK-33996 > URL: https://issues.apache.org/jira/browse/FLINK-33996 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Affects Versions: 1.18.0 >Reporter: Feng Jin >Priority: Major > Labels: pull-request-available > > When multiple top projects reference the same bottom project, project rewrite > rules may result in complex projects being calculated multiple times. > Take the following SQL as an example: > {code:sql} > create table test_source(a varchar) with ('connector'='datagen'); > explan plan for select a || 'a' as a, a || 'b' as b FROM (select > REGEXP_REPLACE(a, 'aaa', 'bbb') as a FROM test_source); > {code} > The final SQL plan is as follows: > {code:sql} > == Abstract Syntax Tree == > LogicalProject(a=[||($0, _UTF-16LE'a')], b=[||($0, _UTF-16LE'b')]) > +- LogicalProject(a=[REGEXP_REPLACE($0, _UTF-16LE'aaa', _UTF-16LE'bbb')]) >+- LogicalTableScan(table=[[default_catalog, default_database, > test_source]]) > == Optimized Physical Plan == > Calc(select=[||(REGEXP_REPLACE(a, _UTF-16LE'aaa', _UTF-16LE'bbb'), > _UTF-16LE'a') AS a, ||(REGEXP_REPLACE(a, _UTF-16LE'aaa', _UTF-16LE'bbb'), > _UTF-16LE'b') AS b]) > +- TableSourceScan(table=[[default_catalog, default_database, test_source]], > fields=[a]) > == Optimized Execution Plan == > Calc(select=[||(REGEXP_REPLACE(a, 'aaa', 'bbb'), 'a') AS a, > ||(REGEXP_REPLACE(a, 'aaa', 'bbb'), 'b') AS b]) > +- TableSourceScan(table=[[default_catalog, default_database, test_source]], > fields=[a]) > {code} > It can be observed that after project write, regex_place is calculated twice. > Generally speaking, regular expression matching is a time-consuming operation > and we usually do not want it to be calculated multiple times. Therefore, for > this scenario, we can support disabling project rewrite. > After disabling some rules, the final plan we obtained is as follows: > {code:sql} > == Abstract Syntax Tree == > LogicalProject(a=[||($0, _UTF-16LE'a')], b=[||($0, _UTF-16LE'b')]) > +- LogicalProject(a=[REGEXP_REPLACE($0, _UTF-16LE'aaa', _UTF-16LE'bbb')]) >+- LogicalTableScan(table=[[default_catalog, default_database, > test_source]]) > == Optimized Physical Plan == > Calc(select=[||(a, _UTF-16LE'a') AS a, ||(a, _UTF-16LE'b') AS b]) > +- Calc(select=[REGEXP_REPLACE(a, _UTF-16LE'aaa', _UTF-16LE'bbb') AS a]) >+- TableSourceScan(table=[[default_catalog, default_database, > test_source]], fields=[a]) > == Optimized Execution Plan == > Calc(select=[||(a, 'a') AS a, ||(a, 'b') AS b]) > +- Calc(select=[REGEXP_REPLACE(a, 'aaa', 'bbb') AS a]) >+- TableSourceScan(table=[[default_catalog, default_database, > test_source]], fields=[a]) > {code} > After testing, we probably need to modify these few rules: > org.apache.flink.table.planner.plan.rules.logical.FlinkProjectMergeRule > org.apache.flink.table.planner.plan.rules.logical.FlinkCalcMergeRule > org.apache.flink.table.planner.plan.rules.logical.FlinkProjectCalcMergeRule -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33996][table-planner]: Avoid merging projects if leads to redundant computation [flink]
flinkbot commented on PR #24033: URL: https://github.com/apache/flink/pull/24033#issuecomment-1878973201 ## CI report: * d687103dabf6ac2a0af763e8a3b845ef9629c7ba UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-33996) Support disabling project rewrite when multiple exprs in the project reference the same sub project field.
[ https://issues.apache.org/jira/browse/FLINK-33996?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-33996: --- Labels: pull-request-available (was: ) > Support disabling project rewrite when multiple exprs in the project > reference the same sub project field. > -- > > Key: FLINK-33996 > URL: https://issues.apache.org/jira/browse/FLINK-33996 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Affects Versions: 1.18.0 >Reporter: Feng Jin >Priority: Major > Labels: pull-request-available > > When multiple top projects reference the same bottom project, project rewrite > rules may result in complex projects being calculated multiple times. > Take the following SQL as an example: > {code:sql} > create table test_source(a varchar) with ('connector'='datagen'); > explan plan for select a || 'a' as a, a || 'b' as b FROM (select > REGEXP_REPLACE(a, 'aaa', 'bbb') as a FROM test_source); > {code} > The final SQL plan is as follows: > {code:sql} > == Abstract Syntax Tree == > LogicalProject(a=[||($0, _UTF-16LE'a')], b=[||($0, _UTF-16LE'b')]) > +- LogicalProject(a=[REGEXP_REPLACE($0, _UTF-16LE'aaa', _UTF-16LE'bbb')]) >+- LogicalTableScan(table=[[default_catalog, default_database, > test_source]]) > == Optimized Physical Plan == > Calc(select=[||(REGEXP_REPLACE(a, _UTF-16LE'aaa', _UTF-16LE'bbb'), > _UTF-16LE'a') AS a, ||(REGEXP_REPLACE(a, _UTF-16LE'aaa', _UTF-16LE'bbb'), > _UTF-16LE'b') AS b]) > +- TableSourceScan(table=[[default_catalog, default_database, test_source]], > fields=[a]) > == Optimized Execution Plan == > Calc(select=[||(REGEXP_REPLACE(a, 'aaa', 'bbb'), 'a') AS a, > ||(REGEXP_REPLACE(a, 'aaa', 'bbb'), 'b') AS b]) > +- TableSourceScan(table=[[default_catalog, default_database, test_source]], > fields=[a]) > {code} > It can be observed that after project write, regex_place is calculated twice. > Generally speaking, regular expression matching is a time-consuming operation > and we usually do not want it to be calculated multiple times. Therefore, for > this scenario, we can support disabling project rewrite. > After disabling some rules, the final plan we obtained is as follows: > {code:sql} > == Abstract Syntax Tree == > LogicalProject(a=[||($0, _UTF-16LE'a')], b=[||($0, _UTF-16LE'b')]) > +- LogicalProject(a=[REGEXP_REPLACE($0, _UTF-16LE'aaa', _UTF-16LE'bbb')]) >+- LogicalTableScan(table=[[default_catalog, default_database, > test_source]]) > == Optimized Physical Plan == > Calc(select=[||(a, _UTF-16LE'a') AS a, ||(a, _UTF-16LE'b') AS b]) > +- Calc(select=[REGEXP_REPLACE(a, _UTF-16LE'aaa', _UTF-16LE'bbb') AS a]) >+- TableSourceScan(table=[[default_catalog, default_database, > test_source]], fields=[a]) > == Optimized Execution Plan == > Calc(select=[||(a, 'a') AS a, ||(a, 'b') AS b]) > +- Calc(select=[REGEXP_REPLACE(a, 'aaa', 'bbb') AS a]) >+- TableSourceScan(table=[[default_catalog, default_database, > test_source]], fields=[a]) > {code} > After testing, we probably need to modify these few rules: > org.apache.flink.table.planner.plan.rules.logical.FlinkProjectMergeRule > org.apache.flink.table.planner.plan.rules.logical.FlinkCalcMergeRule > org.apache.flink.table.planner.plan.rules.logical.FlinkProjectCalcMergeRule -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-33996][table-planner]: Avoid merging projects if leads to redundant computation [flink]
jeyhunkarimov opened a new pull request, #24033: URL: https://github.com/apache/flink/pull/24033 ## What is the purpose of the change The current optimizer/planner merges two Projection/Calc nodes if they are deterministic. In some scenarios, this check is not enough. Especially, when the top Projection/Calc node uses the already computed value from the bottom Project/Calc node, merging the two of them might lead to redundant computation (recomputing the same expressions several times). In fact, our tests contained many tests that computed the same expression several times. ## Brief change log - Extend the `mergeable` method and add an additional check - Add tests ## Verifying this change This change added tests to `CalcMergeTestBase` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): ( no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: ( no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-33775) Report JobInitialization traces
[ https://issues.apache.org/jira/browse/FLINK-33775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski closed FLINK-33775. -- Fix Version/s: 1.19.0 Resolution: Fixed merged commit 793a66b into apache:master > Report JobInitialization traces > --- > > Key: FLINK-33775 > URL: https://issues.apache.org/jira/browse/FLINK-33775 > Project: Flink > Issue Type: Sub-task >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-33695) FLIP-384: Introduce TraceReporter and use it to create checkpointing and recovery traces
[ https://issues.apache.org/jira/browse/FLINK-33695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski closed FLINK-33695. -- Resolution: Fixed > FLIP-384: Introduce TraceReporter and use it to create checkpointing and > recovery traces > > > Key: FLINK-33695 > URL: https://issues.apache.org/jira/browse/FLINK-33695 > Project: Flink > Issue Type: New Feature > Components: Runtime / Checkpointing, Runtime / Metrics >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Fix For: 1.19.0 > > > https://cwiki.apache.org/confluence/x/TguZE > *Motivation* > Currently Flink has a limited observability of checkpoint and recovery > processes. > For checkpointing Flink has a very detailed overview in the Flink WebUI, > which works great in many use cases, however it’s problematic if one is > operating multiple Flink clusters, or if cluster/JM dies. Additionally there > are a couple of metrics (like lastCheckpointDuration or lastCheckpointSize), > however those metrics have a couple of issues: > * They are reported and refreshed periodically, depending on the > MetricReporter settings, which doesn’t take into account checkpointing > frequency. > ** If checkpointing interval > metric reporting interval, we would be > reporting the same values multiple times. > ** If checkpointing interval < metric reporting interval, we would be > randomly dropping metrics for some of the checkpoints. > For recovery we are missing even the most basic of the metrics and Flink > WebUI support. Also given the fact that recovery is even less frequent > compared to checkpoints, adding recovery metrics would have even bigger > problems with unnecessary reporting the same values. > In this FLIP I’m proposing to add support for reporting traces/spans > (example: Traces) and use this mechanism to report checkpointing and recovery > traces. I hope in the future traces will also prove useful in other areas of > Flink like job submission, job state changes, ... . Moreover as the API to > report traces will be added to the MetricGroup , users will be also able to > access this API. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33775] Report JobInitialization traces [flink]
pnowojski merged PR #23908: URL: https://github.com/apache/flink/pull/23908 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33978] Introduces AsyncScalarFunction as a new UDF type [flink]
alpinegizmo commented on PR #23975: URL: https://github.com/apache/flink/pull/23975#issuecomment-1878877745 We should add appropriate content to the UDF docs as part of implementing this FLIP. https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/ is the page where that goes. I think a new section right after the section on Scalar Functions would be the right place for that. Most of the text needed can be copied out of the FLIP, along with the example. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-33978) FLIP-400: AsyncScalarFunction for asynchronous scalar function support
[ https://issues.apache.org/jira/browse/FLINK-33978?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-33978: --- Labels: pull-request-available (was: ) > FLIP-400: AsyncScalarFunction for asynchronous scalar function support > -- > > Key: FLINK-33978 > URL: https://issues.apache.org/jira/browse/FLINK-33978 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner, Table SQL / Runtime >Reporter: Alan Sheinberg >Priority: Major > Labels: pull-request-available > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-400%3A+AsyncScalarFunction+for+asynchronous+scalar+function+support -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33978] Introduces AsyncScalarFunction as a new UDF type [flink]
twalthr commented on code in PR #23975: URL: https://github.com/apache/flink/pull/23975#discussion_r1442740953 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java: ## @@ -692,6 +734,14 @@ public enum AsyncOutputMode { ALLOW_UNORDERED } +/** Retry strategy in the case of failure. */ +@Experimental Review Comment: drop the annotation, it should be well tested already ## docs/layouts/shortcodes/generated/execution_config_configuration.html: ## @@ -26,6 +26,36 @@ Duration The async timeout for the asynchronous operation to complete. + +table.exec.async-scalar.buffer-capacity Streaming +10 +Integer +The max number of async i/o operation that the async lookup join can trigger. + + +table.exec.async-scalar.max-attempts Streaming +3 +Integer +The max number of async retry attempts to make when Review Comment: incomplete sentence ## docs/layouts/shortcodes/generated/execution_config_configuration.html: ## @@ -26,6 +26,36 @@ Duration The async timeout for the asynchronous operation to complete. + +table.exec.async-scalar.buffer-capacity Streaming +10 +Integer +The max number of async i/o operation that the async lookup join can trigger. + + +table.exec.async-scalar.max-attempts Streaming +3 +Integer +The max number of async retry attempts to make when + + +table.exec.async-scalar.retry-delay Streaming +100 ms +Duration +The delay to wait before trying again. + + +table.exec.async-scalar.retry-strategy Streaming +FIXED_DELAY +Enum +Restart strategy which will be used, FIXED_DELAY by default.Possible values:"FIXED_DELAY" Review Comment: an enum with just 1 value makes not much sense, what other strategies can we offer. I see at least a "no retry" one? ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunctionHelper.java: ## @@ -67,6 +67,8 @@ public final class UserDefinedFunctionHelper { public static final String SCALAR_EVAL = "eval"; +public static final String ASYNC_SCALAR_EVAL = "eval"; + public static final String TABLE_EVAL = "eval"; Review Comment: Add special validation for this method to check that the return type is always void and the first arg is always CompletableFuture. Otherwise users might get a weird error for `eval(int i)` in DataTypeExtractor. ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/DataTypeExtractor.java: ## @@ -155,6 +157,48 @@ public static DataType extractFromMethodParameter( paramPos, method.getName(), baseClass.getName())); } +/** + * Extracts a data type from a method parameter by considering surrounding classes and parameter + * annotation. This version assumes that the parameter is a generic type, and uses the generic + * position type as the extracted data type. For example, if the parameter is a + * CompletableFutureLong and genericPos is 0, it will extract Long. + */ +public static DataType extractFromGenericMethodParameter( +DataTypeFactory typeFactory, +Class baseClass, +Method method, +int paramPos, +int genericPos) { + +Type parameterType = method.getGenericParameterTypes()[paramPos]; +// Trusty library allows to resolve generic types where the type resolves to a type with +// generic parameters... +parameterType = TypeToken.of(baseClass).resolveType(parameterType).getType(); Review Comment: Can you give a concrete example where this line is necessary? Personally, I would like to avoid deps to Guava in these crucial utils. `extractDataTypeWithClassContext` is able to perform the same purpose and resolve vars. I think we can drop this line if we validate for a CompletableFuture in the helper as mentioned above. ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AsyncScalarFunction.java: ## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at
Re: [PR] [FLINK-34004] Harden TestingCheckpointIDCounter [flink]
flinkbot commented on PR #24032: URL: https://github.com/apache/flink/pull/24032#issuecomment-1878827522 ## CI report: * 9a8406e252f89c78bcd4c9beb36e967c18a83045 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-34004) TestingCheckpointIDCounter can easily lead to NPEs
[ https://issues.apache.org/jira/browse/FLINK-34004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-34004: --- Labels: pull-request-available (was: ) > TestingCheckpointIDCounter can easily lead to NPEs > -- > > Key: FLINK-34004 > URL: https://issues.apache.org/jira/browse/FLINK-34004 > Project: Flink > Issue Type: Technical Debt > Components: Tests >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > > The TestingCheckpointIDCounter builder doesn't define safe defaults for all > builder parameters. Using it can easily lead to surprising null pointer > exceptions in tests when code is being modified to call more methods. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34004) TestingCheckpointIDCounter can easily lead to NPEs
Chesnay Schepler created FLINK-34004: Summary: TestingCheckpointIDCounter can easily lead to NPEs Key: FLINK-34004 URL: https://issues.apache.org/jira/browse/FLINK-34004 Project: Flink Issue Type: Technical Debt Components: Tests Reporter: Chesnay Schepler Assignee: Chesnay Schepler Fix For: 1.19.0 The TestingCheckpointIDCounter builder doesn't define safe defaults for all builder parameters. Using it can easily lead to surprising null pointer exceptions in tests when code is being modified to call more methods. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] Add HBase 3.0.1 connector [flink-web]
MartijnVisser opened a new pull request, #708: URL: https://github.com/apache/flink-web/pull/708 (no comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32571) Prebuild HBase testing docker image
[ https://issues.apache.org/jira/browse/FLINK-32571?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser updated FLINK-32571: --- Fix Version/s: (was: hbase-3.0.1) > Prebuild HBase testing docker image > --- > > Key: FLINK-32571 > URL: https://issues.apache.org/jira/browse/FLINK-32571 > Project: Flink > Issue Type: Technical Debt > Components: Connectors / HBase >Reporter: Chesnay Schepler >Priority: Major > > For testing we currently build an HBase docker image on-demand during > testing. We can improve reliability and testing times by building this image > ahead of time, as the only parameter is the HBase version. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34003) Bump CI flink version on flink-connector-hbase
[ https://issues.apache.org/jira/browse/FLINK-34003?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-34003: --- Labels: pull-request-available (was: ) > Bump CI flink version on flink-connector-hbase > -- > > Key: FLINK-34003 > URL: https://issues.apache.org/jira/browse/FLINK-34003 > Project: Flink > Issue Type: Technical Debt > Components: Connectors / HBase >Reporter: Martijn Visser >Assignee: Martijn Visser >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34003) Bump CI flink version on flink-connector-hbase
Martijn Visser created FLINK-34003: -- Summary: Bump CI flink version on flink-connector-hbase Key: FLINK-34003 URL: https://issues.apache.org/jira/browse/FLINK-34003 Project: Flink Issue Type: Technical Debt Components: Connectors / HBase Reporter: Martijn Visser Assignee: Martijn Visser -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-31238] [WIP] Use IngestDB to speed up Rocksdb rescaling recovery (rebased) [flink]
flinkbot commented on PR #24031: URL: https://github.com/apache/flink/pull/24031#issuecomment-1878781250 ## CI report: * 056b9af12d0795dadabde47e50dfb040e14ad71d UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-31238] [WIP] Use IngestDB to speed up Rocksdb rescaling recovery (rebased) [flink]
StefanRRichter commented on code in PR #24031: URL: https://github.com/apache/flink/pull/24031#discussion_r1442957056 ## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java: ## @@ -414,90 +424,70 @@ private void restoreWithRescaling(Collection restoreStateHandl * temporary RocksDB instance for a key-groups shard. All contents from the temporary instance * are copied into the real restore instance and then the temporary instance is discarded. */ -private void restoreWithIngestDbMode(Collection restoreStateHandles) +private void rescaleClipIngestDB( +Collection localKeyedStateHandles, +byte[] startKeyGroupPrefixBytes, +byte[] stopKeyGroupPrefixBytes) throws Exception { -Preconditions.checkArgument(restoreStateHandles != null && !restoreStateHandles.isEmpty()); - -Map allDownloadSpecs = - CollectionUtil.newHashMapWithExpectedSize(restoreStateHandles.size()); - final Path absolutInstanceBasePath = instanceBasePath.getAbsoluteFile().toPath(); final Path exportCfBasePath = absolutInstanceBasePath.resolve("export-cfs"); Files.createDirectories(exportCfBasePath); -// Open base db as Empty DB -this.rocksHandle.openDB(); - -// Prepare and collect all the download request to pull remote state to a local directory -for (KeyedStateHandle stateHandle : restoreStateHandles) { -if (!(stateHandle instanceof IncrementalRemoteKeyedStateHandle)) { -throw unexpectedStateHandleException( -IncrementalRemoteKeyedStateHandle.class, stateHandle.getClass()); -} -StateHandleDownloadSpec downloadRequest = -new StateHandleDownloadSpec( -(IncrementalRemoteKeyedStateHandle) stateHandle, - absolutInstanceBasePath.resolve(UUID.randomUUID().toString())); -allDownloadSpecs.put(stateHandle.getStateHandleId(), downloadRequest); -} - -// Process all state downloads -transferRemoteStateToLocalDirectory(allDownloadSpecs.values()); - -// Transfer remaining key-groups from temporary instance into base DB -byte[] startKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes]; -CompositeKeySerializationUtils.serializeKeyGroup( -keyGroupRange.getStartKeyGroup(), startKeyGroupPrefixBytes); +final Map> +columnFamilyMetaDataToImport = new HashMap<>(); -byte[] stopKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes]; -CompositeKeySerializationUtils.serializeKeyGroup( -keyGroupRange.getEndKeyGroup() + 1, stopKeyGroupPrefixBytes); - -HashMap> cfMetaDataToImport = -new HashMap(); - -// Insert all remaining state through creating temporary RocksDB instances -for (StateHandleDownloadSpec downloadRequest : allDownloadSpecs.values()) { -logger.info( -"Starting to restore from state handle: {} with rescaling.", -downloadRequest.getStateHandle()); +try { +for (IncrementalLocalKeyedStateHandle stateHandle : localKeyedStateHandles) { +logger.info( +"Starting to restore from state handle: {} with rescaling using Clip/Ingest DB.", +stateHandle); -try (RestoredDBInstance tmpRestoreDBInfo = -restoreTempDBInstanceFromDownloadedState(downloadRequest)) { +try (RestoredDBInstance tmpRestoreDBInfo = +restoreTempDBInstanceFromLocalState(stateHandle)) { -List tmpColumnFamilyHandles = -tmpRestoreDBInfo.columnFamilyHandles; +List tmpColumnFamilyHandles = +tmpRestoreDBInfo.columnFamilyHandles; -// Clip all tmp db to Range [startKeyGroupPrefixBytes, stopKeyGroupPrefixBytes) -RocksDBIncrementalCheckpointUtils.clipColumnFamilies( -tmpRestoreDBInfo.db, -tmpColumnFamilyHandles, -startKeyGroupPrefixBytes, -stopKeyGroupPrefixBytes); - -// Export all the Column Families -Map exportedCFAndMetaData = -RocksDBIncrementalCheckpointUtils.exportColumnFamilies( -tmpRestoreDBInfo.db, -tmpColumnFamilyHandles, -tmpRestoreDBInfo.stateMetaInfoSnapshots, -exportCfBasePath); - -exportedCFAndMetaData.forEach( -(stateMeta, cfMetaData) -> { -
[jira] [Commented] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector
[ https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803557#comment-17803557 ] Sergey Nuyanzin commented on FLINK-33365: - marked as a blocker as mentioned at https://lists.apache.org/thread/pc15twvf90l0d6t4vthomh6jzv1h145d > Missing filter condition in execution plan containing lookup join with mysql > jdbc connector > --- > > Key: FLINK-33365 > URL: https://issues.apache.org/jira/browse/FLINK-33365 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC >Affects Versions: 1.18.0, 1.17.1 > Environment: Flink 1.17.1 & Flink 1.18.0 with > flink-connector-jdbc-3.1.1-1.17.jar >Reporter: macdoor615 >Assignee: david radley >Priority: Blocker > Labels: pull-request-available > Attachments: flink-connector-jdbc-3.0.0-1.16.png, > flink-connector-jdbc-3.1.1-1.17.png > > > create table in flink with sql-client.sh > {code:java} > CREATE TABLE default_catalog.default_database.a ( > ip string, > proctime as proctime() > ) > WITH ( > 'connector' = 'datagen' > );{code} > create table in mysql > {code:java} > create table b ( > ip varchar(20), > type int > ); {code} > > Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar* > excute in sql-client.sh > {code:java} > explain SELECT * FROM default_catalog.default_database.a left join > bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and > a.ip = b.ip; {code} > get the execution plan > {code:java} > ... > == Optimized Execution Plan == > Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type]) > +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], > lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip > AS VARCHAR(2147483647)) AS ip0]) > +- Calc(select=[ip, PROCTIME() AS proctime]) > +- TableSourceScan(table=[[default_catalog, default_database, a]], > fields=[ip]){code} > > excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and > *flink-connector-jdbc-3.0.0-1.16.jar* > get the execution plan > {code:java} > ... > == Optimized Execution Plan == > Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type]) > +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], > lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 > AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0]) > +- Calc(select=[ip, PROCTIME() AS proctime]) > +- TableSourceScan(table=[[default_catalog, default_database, a]], > fields=[ip]) {code} > with flink-connector-jdbc-3.1.1-1.17.jar, the condition is > *lookup=[ip=ip]* > with flink-connector-jdbc-3.0.0-1.16.jar , the condition is > *lookup=[type=0, ip=ip], where=[(type = 0)]* > > In out real world production environment, this lead incorrect data output > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34002) Bump CI flink version on flink-connector-elasticsearch
[ https://issues.apache.org/jira/browse/FLINK-34002?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-34002: --- Labels: pull-request-available (was: ) > Bump CI flink version on flink-connector-elasticsearch > -- > > Key: FLINK-34002 > URL: https://issues.apache.org/jira/browse/FLINK-34002 > Project: Flink > Issue Type: Technical Debt > Components: Connectors / ElasticSearch >Reporter: Martijn Visser >Assignee: Martijn Visser >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34002) Bump CI flink version on flink-connector-elasticsearch
Martijn Visser created FLINK-34002: -- Summary: Bump CI flink version on flink-connector-elasticsearch Key: FLINK-34002 URL: https://issues.apache.org/jira/browse/FLINK-34002 Project: Flink Issue Type: Technical Debt Components: Connectors / ElasticSearch Reporter: Martijn Visser Assignee: Martijn Visser -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33365) Missing filter condition in execution plan containing lookup join with mysql jdbc connector
[ https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Nuyanzin updated FLINK-33365: Priority: Blocker (was: Critical) > Missing filter condition in execution plan containing lookup join with mysql > jdbc connector > --- > > Key: FLINK-33365 > URL: https://issues.apache.org/jira/browse/FLINK-33365 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC >Affects Versions: 1.18.0, 1.17.1 > Environment: Flink 1.17.1 & Flink 1.18.0 with > flink-connector-jdbc-3.1.1-1.17.jar >Reporter: macdoor615 >Assignee: david radley >Priority: Blocker > Labels: pull-request-available > Attachments: flink-connector-jdbc-3.0.0-1.16.png, > flink-connector-jdbc-3.1.1-1.17.png > > > create table in flink with sql-client.sh > {code:java} > CREATE TABLE default_catalog.default_database.a ( > ip string, > proctime as proctime() > ) > WITH ( > 'connector' = 'datagen' > );{code} > create table in mysql > {code:java} > create table b ( > ip varchar(20), > type int > ); {code} > > Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar* > excute in sql-client.sh > {code:java} > explain SELECT * FROM default_catalog.default_database.a left join > bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and > a.ip = b.ip; {code} > get the execution plan > {code:java} > ... > == Optimized Execution Plan == > Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type]) > +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], > lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip > AS VARCHAR(2147483647)) AS ip0]) > +- Calc(select=[ip, PROCTIME() AS proctime]) > +- TableSourceScan(table=[[default_catalog, default_database, a]], > fields=[ip]){code} > > excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and > *flink-connector-jdbc-3.0.0-1.16.jar* > get the execution plan > {code:java} > ... > == Optimized Execution Plan == > Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type]) > +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], > lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 > AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0]) > +- Calc(select=[ip, PROCTIME() AS proctime]) > +- TableSourceScan(table=[[default_catalog, default_database, a]], > fields=[ip]) {code} > with flink-connector-jdbc-3.1.1-1.17.jar, the condition is > *lookup=[ip=ip]* > with flink-connector-jdbc-3.0.0-1.16.jar , the condition is > *lookup=[type=0, ip=ip], where=[(type = 0)]* > > In out real world production environment, this lead incorrect data output > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl closed pull request #79: [FLINK-33365] include filters with Lookup joins URL: https://github.com/apache/flink-connector-jdbc/pull/79 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Bump org.apache.commons:commons-compress from 1.23.0 to 1.24.0 [flink-connector-hbase]
dependabot[bot] opened a new pull request, #38: URL: https://github.com/apache/flink-connector-hbase/pull/38 Bumps org.apache.commons:commons-compress from 1.23.0 to 1.24.0. [![Dependabot compatibility score](https://dependabot-badges.githubapp.com/badges/compatibility_score?dependency-name=org.apache.commons:commons-compress=maven=1.23.0=1.24.0)](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibility-scores) Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting `@dependabot rebase`. [//]: # (dependabot-automerge-start) [//]: # (dependabot-automerge-end) --- Dependabot commands and options You can trigger Dependabot actions by commenting on this PR: - `@dependabot rebase` will rebase this PR - `@dependabot recreate` will recreate this PR, overwriting any edits that have been made to it - `@dependabot merge` will merge this PR after your CI passes on it - `@dependabot squash and merge` will squash and merge this PR after your CI passes on it - `@dependabot cancel merge` will cancel a previously requested merge and block automerging - `@dependabot reopen` will reopen this PR if it is closed - `@dependabot close` will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually - `@dependabot show ignore conditions` will show all of the ignore conditions of the specified dependency - `@dependabot ignore this major version` will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself) - `@dependabot ignore this minor version` will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself) - `@dependabot ignore this dependency` will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself) You can disable automated security fix PRs for this repo from the [Security Alerts page](https://github.com/apache/flink-connector-hbase/network/alerts). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Bump org.apache.zookeeper:zookeeper from 3.4.14 to 3.7.2 [flink-connector-hbase]
dependabot[bot] opened a new pull request, #37: URL: https://github.com/apache/flink-connector-hbase/pull/37 Bumps org.apache.zookeeper:zookeeper from 3.4.14 to 3.7.2. [![Dependabot compatibility score](https://dependabot-badges.githubapp.com/badges/compatibility_score?dependency-name=org.apache.zookeeper:zookeeper=maven=3.4.14=3.7.2)](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibility-scores) Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting `@dependabot rebase`. [//]: # (dependabot-automerge-start) [//]: # (dependabot-automerge-end) --- Dependabot commands and options You can trigger Dependabot actions by commenting on this PR: - `@dependabot rebase` will rebase this PR - `@dependabot recreate` will recreate this PR, overwriting any edits that have been made to it - `@dependabot merge` will merge this PR after your CI passes on it - `@dependabot squash and merge` will squash and merge this PR after your CI passes on it - `@dependabot cancel merge` will cancel a previously requested merge and block automerging - `@dependabot reopen` will reopen this PR if it is closed - `@dependabot close` will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually - `@dependabot show ignore conditions` will show all of the ignore conditions of the specified dependency - `@dependabot ignore this major version` will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself) - `@dependabot ignore this minor version` will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself) - `@dependabot ignore this dependency` will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself) You can disable automated security fix PRs for this repo from the [Security Alerts page](https://github.com/apache/flink-connector-hbase/network/alerts). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl commented on PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1878726394 Fixed the test failures -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl closed pull request #79: [FLINK-33365] include filters with Lookup joins URL: https://github.com/apache/flink-connector-jdbc/pull/79 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl closed pull request #79: [FLINK-33365] include filters with Lookup joins URL: https://github.com/apache/flink-connector-jdbc/pull/79 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl closed pull request #79: [FLINK-33365] include filters with Lookup joins URL: https://github.com/apache/flink-connector-jdbc/pull/79 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl closed pull request #79: [FLINK-33365] include filters with Lookup joins URL: https://github.com/apache/flink-connector-jdbc/pull/79 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Add JDBC 3.1.2 connector [flink-web]
MartijnVisser opened a new pull request, #707: URL: https://github.com/apache/flink-web/pull/707 (no comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl closed pull request #79: [FLINK-33365] include filters with Lookup joins URL: https://github.com/apache/flink-connector-jdbc/pull/79 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-33897) Allow triggering unaligned checkpoint via CLI
[ https://issues.apache.org/jira/browse/FLINK-33897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803535#comment-17803535 ] Piotr Nowojski commented on FLINK-33897: By real world motivation, I meant if that really is an issue that someone complained about? If not, and this is just a theoretical possibility that comes from your observation when implementing FLINK-6755 "it could be implemented, someone might find it useful", I would put it aside for the time being. Honestly, I doubt many users would use this feature. In most cases just cancelling the job and restarting with new configuration would be faster vs someone first trying to find out in the docs/user mailing list/stack overflow that he can actually trigger unaligned checkpoint from CLI first. This would be only useful to a handful of power users, but those should already know about that it's better to use unaligned checkpoints from the get go. {quote} I'm not very familiar with this part so if you think this is a big change, I won't insist on doing it. {quote} Adding a new BarrierHandlerState maybe is not a very big change per se, but will visible increase complexity of the code when someone needs to read/understand it. {quote} I do agree we could enable timeout for aligned cp by default, which greatly reduce this case {quote} Let me start the dev mailing list discussion about that. > Allow triggering unaligned checkpoint via CLI > - > > Key: FLINK-33897 > URL: https://issues.apache.org/jira/browse/FLINK-33897 > Project: Flink > Issue Type: New Feature > Components: Command Line Client, Runtime / Checkpointing >Reporter: Zakelly Lan >Assignee: Zakelly Lan >Priority: Major > > After FLINK-6755, user could trigger checkpoint through CLI. However I > noticed there would be value supporting trigger it in unaligned way, since > the job may encounter a high back-pressure and an aligned checkpoint would > fail. > > I suggest we provide an option '-unaligned' in CLI to support that. > > Similar option would also be useful for REST api -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33970][jdbc][docs] Remove dead link [flink-connector-jdbc]
GOODBOY008 commented on PR #88: URL: https://github.com/apache/flink-connector-jdbc/pull/88#issuecomment-1878625591 @MartijnVisser I want to fix dead link in document first. And I also want to add dead link check into https://github.com/apache/flink-connector-shared-utils/tree/ci_utils ,because hugo build cannot check dead link in document. And also in flink repo should add this check also? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl commented on PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#issuecomment-1878620080 I see there are some test failures - I will look into them -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-33970][jdbc][docs] Add docs check dead links [flink-connector-shared-utils]
GOODBOY008 opened a new pull request, #32: URL: https://github.com/apache/flink-connector-shared-utils/pull/32 Add docs check dead links. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33634] Add Conditions to Flink CRD's Status field [flink-kubernetes-operator]
tagarr commented on code in PR #749: URL: https://github.com/apache/flink-kubernetes-operator/pull/749#discussion_r1442824067 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java: ## @@ -227,4 +235,28 @@ private boolean validateDeployment(FlinkResourceContext ctx) { } return true; } + +private void setCRStatus(FlinkDeployment flinkApp) { +final List conditions = new ArrayList<>(); +FlinkDeploymentStatus deploymentStatus = flinkApp.getStatus(); +switch (deploymentStatus.getJobManagerDeploymentStatus()) { Review Comment: I think we need a bit more logic here. We need to check if the status doesn't already have a condition in it's list that matches the same type, we need to then check the reason, status and message of this condition and only if they are different replace the existing condition with the new one. We shouldn't just replace the existing list of conditions with a new one, there may be additional conditions of different types added later that would get removed. This may be one of the reasons why the tests have had to be modified to bump up the number of status changes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33634] Add Conditions to Flink CRD's Status field [flink-kubernetes-operator]
tagarr commented on code in PR #749: URL: https://github.com/apache/flink-kubernetes-operator/pull/749#discussion_r1442824067 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java: ## @@ -227,4 +235,28 @@ private boolean validateDeployment(FlinkResourceContext ctx) { } return true; } + +private void setCRStatus(FlinkDeployment flinkApp) { +final List conditions = new ArrayList<>(); +FlinkDeploymentStatus deploymentStatus = flinkApp.getStatus(); +switch (deploymentStatus.getJobManagerDeploymentStatus()) { Review Comment: I think we need a bit more logic here. We need to check if the status doesn't already have a condition in it's list that matches the same type, we need to then check the reason and message of this condition and only if they are different replace the existing condition with the new one. We shouldn't just replace the existing list of conditions with a new one, there may be additional conditions of different types added later that would get removed. This may be one of the reasons why the tests have had to be modified to bump up the number of status changes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add HBase connector 3.0.0 [flink-web]
MartijnVisser merged PR #591: URL: https://github.com/apache/flink-web/pull/591 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-33996) Support disabling project rewrite when multiple exprs in the project reference the same sub project field.
[ https://issues.apache.org/jira/browse/FLINK-33996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803524#comment-17803524 ] Feng Jin commented on FLINK-33996: -- [~xuyangzhong] Introducing a new rule may be a better way, otherwise we would need to modify multiple rules listed above. > Support disabling project rewrite when multiple exprs in the project > reference the same sub project field. > -- > > Key: FLINK-33996 > URL: https://issues.apache.org/jira/browse/FLINK-33996 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Affects Versions: 1.18.0 >Reporter: Feng Jin >Priority: Major > > When multiple top projects reference the same bottom project, project rewrite > rules may result in complex projects being calculated multiple times. > Take the following SQL as an example: > {code:sql} > create table test_source(a varchar) with ('connector'='datagen'); > explan plan for select a || 'a' as a, a || 'b' as b FROM (select > REGEXP_REPLACE(a, 'aaa', 'bbb') as a FROM test_source); > {code} > The final SQL plan is as follows: > {code:sql} > == Abstract Syntax Tree == > LogicalProject(a=[||($0, _UTF-16LE'a')], b=[||($0, _UTF-16LE'b')]) > +- LogicalProject(a=[REGEXP_REPLACE($0, _UTF-16LE'aaa', _UTF-16LE'bbb')]) >+- LogicalTableScan(table=[[default_catalog, default_database, > test_source]]) > == Optimized Physical Plan == > Calc(select=[||(REGEXP_REPLACE(a, _UTF-16LE'aaa', _UTF-16LE'bbb'), > _UTF-16LE'a') AS a, ||(REGEXP_REPLACE(a, _UTF-16LE'aaa', _UTF-16LE'bbb'), > _UTF-16LE'b') AS b]) > +- TableSourceScan(table=[[default_catalog, default_database, test_source]], > fields=[a]) > == Optimized Execution Plan == > Calc(select=[||(REGEXP_REPLACE(a, 'aaa', 'bbb'), 'a') AS a, > ||(REGEXP_REPLACE(a, 'aaa', 'bbb'), 'b') AS b]) > +- TableSourceScan(table=[[default_catalog, default_database, test_source]], > fields=[a]) > {code} > It can be observed that after project write, regex_place is calculated twice. > Generally speaking, regular expression matching is a time-consuming operation > and we usually do not want it to be calculated multiple times. Therefore, for > this scenario, we can support disabling project rewrite. > After disabling some rules, the final plan we obtained is as follows: > {code:sql} > == Abstract Syntax Tree == > LogicalProject(a=[||($0, _UTF-16LE'a')], b=[||($0, _UTF-16LE'b')]) > +- LogicalProject(a=[REGEXP_REPLACE($0, _UTF-16LE'aaa', _UTF-16LE'bbb')]) >+- LogicalTableScan(table=[[default_catalog, default_database, > test_source]]) > == Optimized Physical Plan == > Calc(select=[||(a, _UTF-16LE'a') AS a, ||(a, _UTF-16LE'b') AS b]) > +- Calc(select=[REGEXP_REPLACE(a, _UTF-16LE'aaa', _UTF-16LE'bbb') AS a]) >+- TableSourceScan(table=[[default_catalog, default_database, > test_source]], fields=[a]) > == Optimized Execution Plan == > Calc(select=[||(a, 'a') AS a, ||(a, 'b') AS b]) > +- Calc(select=[REGEXP_REPLACE(a, 'aaa', 'bbb') AS a]) >+- TableSourceScan(table=[[default_catalog, default_database, > test_source]], fields=[a]) > {code} > After testing, we probably need to modify these few rules: > org.apache.flink.table.planner.plan.rules.logical.FlinkProjectMergeRule > org.apache.flink.table.planner.plan.rules.logical.FlinkCalcMergeRule > org.apache.flink.table.planner.plan.rules.logical.FlinkProjectCalcMergeRule -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33996) Support disabling project rewrite when multiple exprs in the project reference the same sub project field.
[ https://issues.apache.org/jira/browse/FLINK-33996?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Feng Jin updated FLINK-33996: - Description: When multiple top projects reference the same bottom project, project rewrite rules may result in complex projects being calculated multiple times. Take the following SQL as an example: {code:sql} create table test_source(a varchar) with ('connector'='datagen'); explan plan for select a || 'a' as a, a || 'b' as b FROM (select REGEXP_REPLACE(a, 'aaa', 'bbb') as a FROM test_source); {code} The final SQL plan is as follows: {code:sql} == Abstract Syntax Tree == LogicalProject(a=[||($0, _UTF-16LE'a')], b=[||($0, _UTF-16LE'b')]) +- LogicalProject(a=[REGEXP_REPLACE($0, _UTF-16LE'aaa', _UTF-16LE'bbb')]) +- LogicalTableScan(table=[[default_catalog, default_database, test_source]]) == Optimized Physical Plan == Calc(select=[||(REGEXP_REPLACE(a, _UTF-16LE'aaa', _UTF-16LE'bbb'), _UTF-16LE'a') AS a, ||(REGEXP_REPLACE(a, _UTF-16LE'aaa', _UTF-16LE'bbb'), _UTF-16LE'b') AS b]) +- TableSourceScan(table=[[default_catalog, default_database, test_source]], fields=[a]) == Optimized Execution Plan == Calc(select=[||(REGEXP_REPLACE(a, 'aaa', 'bbb'), 'a') AS a, ||(REGEXP_REPLACE(a, 'aaa', 'bbb'), 'b') AS b]) +- TableSourceScan(table=[[default_catalog, default_database, test_source]], fields=[a]) {code} It can be observed that after project write, regex_place is calculated twice. Generally speaking, regular expression matching is a time-consuming operation and we usually do not want it to be calculated multiple times. Therefore, for this scenario, we can support disabling project rewrite. After disabling some rules, the final plan we obtained is as follows: {code:sql} == Abstract Syntax Tree == LogicalProject(a=[||($0, _UTF-16LE'a')], b=[||($0, _UTF-16LE'b')]) +- LogicalProject(a=[REGEXP_REPLACE($0, _UTF-16LE'aaa', _UTF-16LE'bbb')]) +- LogicalTableScan(table=[[default_catalog, default_database, test_source]]) == Optimized Physical Plan == Calc(select=[||(a, _UTF-16LE'a') AS a, ||(a, _UTF-16LE'b') AS b]) +- Calc(select=[REGEXP_REPLACE(a, _UTF-16LE'aaa', _UTF-16LE'bbb') AS a]) +- TableSourceScan(table=[[default_catalog, default_database, test_source]], fields=[a]) == Optimized Execution Plan == Calc(select=[||(a, 'a') AS a, ||(a, 'b') AS b]) +- Calc(select=[REGEXP_REPLACE(a, 'aaa', 'bbb') AS a]) +- TableSourceScan(table=[[default_catalog, default_database, test_source]], fields=[a]) {code} After testing, we probably need to modify these few rules: org.apache.flink.table.planner.plan.rules.logical.FlinkProjectMergeRule org.apache.flink.table.planner.plan.rules.logical.FlinkCalcMergeRule org.apache.flink.table.planner.plan.rules.logical.FlinkProjectCalcMergeRule was: When multiple top projects reference the same bottom project, project rewrite rules may result in complex projects being calculated multiple times. Take the following SQL as an example: {code:sql} create table test_source(a varchar) with ('connector'='datagen'); explan plan for select a || 'a' as a, a || 'b' as b FROM (select REGEXP_REPLACE(a, 'aaa', 'bbb') as a FROM test_source); {code} The final SQL plan is as follows: {code:sql} == Abstract Syntax Tree == LogicalProject(a=[||($0, _UTF-16LE'a')], b=[||($0, _UTF-16LE'b')]) +- LogicalProject(a=[REGEXP_REPLACE($0, _UTF-16LE'aaa', _UTF-16LE'bbb')]) +- LogicalTableScan(table=[[default_catalog, default_database, test_source]]) == Optimized Physical Plan == Calc(select=[||(REGEXP_REPLACE(a, _UTF-16LE'aaa', _UTF-16LE'bbb'), _UTF-16LE'a') AS a, ||(REGEXP_REPLACE(a, _UTF-16LE'aaa', _UTF-16LE'bbb'), _UTF-16LE'b') AS b]) +- TableSourceScan(table=[[default_catalog, default_database, test_source]], fields=[a]) == Optimized Execution Plan == Calc(select=[||(REGEXP_REPLACE(a, 'aaa', 'bbb'), 'a') AS a, ||(REGEXP_REPLACE(a, 'aaa', 'bbb'), 'b') AS b]) +- TableSourceScan(table=[[default_catalog, default_database, test_source]], fields=[a]) {code} It can be observed that after project write, regex_place is calculated twice. Generally speaking, regular expression matching is a time-consuming operation and we usually do not want it to be calculated multiple times. Therefore, for this scenario, we can support disabling project rewrite. After disabling some rules, the final plan we obtained is as follows: {code:sql} == Abstract Syntax Tree == LogicalProject(a=[||($0, _UTF-16LE'a')], b=[||($0, _UTF-16LE'b')]) +- LogicalProject(a=[REGEXP_REPLACE($0, _UTF-16LE'aaa', _UTF-16LE'bbb')]) +- LogicalTableScan(table=[[default_catalog, default_database, test_source]]) == Optimized Physical Plan == Calc(select=[||(a, _UTF-16LE'a') AS a, ||(a, _UTF-16LE'b') AS b]) +- Calc(select=[REGEXP_REPLACE(a, _UTF-16LE'aaa', _UTF-16LE'bbb') AS a]) +- TableSourceScan(table=[[default_catalog, default_database, test_source]], fields=[a]) == Optimized Execution Plan == Calc(select=[||(a, 'a') AS
Re: [PR] [FLINK-33775] Report JobInitialization traces [flink]
pnowojski commented on code in PR #23908: URL: https://github.com/apache/flink/pull/23908#discussion_r1442770858 ## docs/content/docs/ops/traces.md: ## @@ -97,7 +97,7 @@ Flink reports a single span trace for the whole checkpoint once checkpoint reach - org.apache.flink.runtime.checkpoint.CheckpointStatsTracker + org.apache.flink.runtime.checkpoint.CheckpointStatsTracker Review Comment: I've added line breaks to the scope. More readable and the effect is the same. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33775] Report JobInitialization traces [flink]
pnowojski commented on code in PR #23908: URL: https://github.com/apache/flink/pull/23908#discussion_r1442749273 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java: ## @@ -155,7 +167,12 @@ public CheckpointStatsSnapshot createSnapshot() { counts.createSnapshot(), summary.createSnapshot(), history.createSnapshot(), -latestRestoredCheckpoint); +jobInitializationMetricsBuilder +.map( +JobInitializationMetricsBuilder + ::buildRestoredCheckpointStats) +.orElse(Optional.empty()) +.orElse(null)); Review Comment: Ahh, nice! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-34001) doc of "Configure Operator-level State TTL" error
[ https://issues.apache.org/jira/browse/FLINK-34001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser updated FLINK-34001: --- Component/s: Documentation Table SQL / API (was: API / State Processor) > doc of "Configure Operator-level State TTL" error > - > > Key: FLINK-34001 > URL: https://issues.apache.org/jira/browse/FLINK-34001 > Project: Flink > Issue Type: Bug > Components: Documentation, Table SQL / API >Affects Versions: 1.18.0 >Reporter: yong yang >Priority: Major > > doc: > https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/concepts/overview/#idle-state-retention-time > The current TTL value for both left and right side is {{{}"0 ms"{}}}, which > means the state retention is not enabled. > > but i test find : > The current TTL value for both left and right side is {{{}"0 ms"{}}}, which > means the state is permanence keep! -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl commented on code in PR #79: URL: https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1442730232 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java: ## @@ -96,28 +97,115 @@ public JdbcDynamicTableSource( public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) { // JDBC only support non-nested look up keys String[] keyNames = new String[context.getKeys().length]; + for (int i = 0; i < keyNames.length; i++) { int[] innerKeyArr = context.getKeys()[i]; Preconditions.checkArgument( innerKeyArr.length == 1, "JDBC only support non-nested look up keys"); keyNames[i] = DataType.getFieldNames(physicalRowDataType).get(innerKeyArr[0]); } + final RowType rowType = (RowType) physicalRowDataType.getLogicalType(); + +String[] conditions = null; + +if (this.resolvedPredicates != null) { +conditions = new String[this.resolvedPredicates.size()]; +int processedPushdownParamsIndex = 0; +for (int i = 0; i < this.resolvedPredicates.size(); i++) { +String resolvedPredicate = this.resolvedPredicates.get(i); + +/* + * This replace seems like it should be using a Flink class to resolve the parameter. It does not + * effect the dialects as the placeholder comes from JdbcFilterPushdownPreparedStatementVisitor. + * + * Here is what has been considered as alternatives. + * + * We cannot use the way this is done in getScanRuntimeProvider, as the index we have is the index + * into the filters, but it needs the index into the fields. For example one lookup key and one filter + * would both have an index of 0, which the subsequent code would incorrectly resolve to the first + * field. + * We cannot use the PreparedStatement as we have not got access to the statement here. + * We cannot use ParameterizedPredicate as it takes the filter expression as input (e.g EQUALS(...) + * not the form we have here an example would be ('field1'= ?). + * + * An entry in the resolvedPredicates list may have more than one associated pushdown parameter, for example + * a query like this : ... on e.type = 2 and (e.age = 50 OR height > 90) and a.ip = e.ip; + * will have 2 resolvedPredicates and 3 pushdownParams. The 2nd and 3rd pushdownParams will be for the second + * resolvedPredicate. + * + */ +ArrayList paramsForThisPredicate = new ArrayList(); +char placeholderChar = + JdbcFilterPushdownPreparedStatementVisitor.PUSHDOWN_PREDICATE_PLACEHOLDER +.charAt(0); + +int count = +(int) resolvedPredicate.chars().filter(ch -> ch == placeholderChar).count(); + +for (int j = processedPushdownParamsIndex; +j < processedPushdownParamsIndex + count; +j++) { + paramsForThisPredicate.add(this.pushdownParams[j].toString()); +} +processedPushdownParamsIndex = processedPushdownParamsIndex + count; Review Comment: I have implemented an approach that I think is much more reliable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-25421] Port JDBC Sink to new Unified Sink API (FLIP-143) [flink-connector-jdbc]
RocMarshal commented on code in PR #2: URL: https://github.com/apache/flink-connector-jdbc/pull/2#discussion_r1442655533 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/datasource/statements/JdbcQueryStatement.java: ## @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.datasource.statements; + +import org.apache.flink.annotation.PublicEvolving; + +import java.io.Serializable; +import java.sql.PreparedStatement; +import java.sql.SQLException; + +/** + * Sets {@link PreparedStatement} parameters to use in JDBC Sink based on a specific type of record. Review Comment: How about ``` * Sets {@link PreparedStatement} parameters that's used in JDBC Sink based on a specified type of the record. ``` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-28915) Make application mode could support remote DFS schema(e.g. S3, OSS, HDFS, etc.)
[ https://issues.apache.org/jira/browse/FLINK-28915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803501#comment-17803501 ] Márton Balassi commented on FLINK-28915: Due to the [inactivity|https://github.com/apache/flink/pull/20779#issuecomment-1878422305] on the PRs [~ferenc-csaky] has offered to finish this implementation using the original commit of [~hjw] as a base. > Make application mode could support remote DFS schema(e.g. S3, OSS, HDFS, > etc.) > --- > > Key: FLINK-28915 > URL: https://issues.apache.org/jira/browse/FLINK-28915 > Project: Flink > Issue Type: Improvement > Components: Deployment / Kubernetes, flink-contrib >Reporter: hjw >Assignee: hjw >Priority: Major > Labels: pull-request-available, stale-assigned > > As the Flink document show , local is the only supported scheme in Native k8s > deployment. > Is there have a plan to support s3 filesystem? thx. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]
davidradl closed pull request #79: [FLINK-33365] include filters with Lookup joins URL: https://github.com/apache/flink-connector-jdbc/pull/79 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33980][core] Reorganize job configuration [flink]
JunRuiLee commented on code in PR #24025: URL: https://github.com/apache/flink/pull/24025#discussion_r1442718242 ## flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java: ## @@ -1201,6 +1212,9 @@ public void configure(ReadableConfig configuration, ClassLoader classLoader) { .getOptional(ExecutionOptions.SNAPSHOT_COMPRESSION) .ifPresent(this::setUseSnapshotCompression); RestartStrategies.fromConfiguration(configuration).ifPresent(this::setRestartStrategy); +configuration +.getOptional(RestartStrategyOptions.RESTART_STRATEGY) +.ifPresent(s -> this.setRestartStrategy(configuration)); Review Comment: Yes, you're right. I misunderstood the code when I was reviewing it; sorry about that. I've fixed the error now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-25421] Port JDBC Sink to new Unified Sink API (FLIP-143) [flink-connector-jdbc]
RocMarshal commented on code in PR #2: URL: https://github.com/apache/flink-connector-jdbc/pull/2#discussion_r1442712519 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/datasource/statements/JdbcQueryStatement.java: ## @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.datasource.statements; + +import org.apache.flink.annotation.PublicEvolving; + +import java.io.Serializable; +import java.sql.PreparedStatement; +import java.sql.SQLException; + +/** + * Sets {@link PreparedStatement} parameters to use in JDBC Sink based on a specific type of record. + */ +@PublicEvolving +public interface JdbcQueryStatement extends Serializable { +String query(); + +void map(PreparedStatement ps, T data) throws SQLException; Review Comment: - `data` -> `record` - The method described by the interface here is more like filling the data of a message into a preparedstatement. So how about - changing method name like `JdbcStatementFiller` or `JdbcStatementRender` - changing the interface name to `fill` or `render` ? ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/datasource/statements/JdbcQueryStatement.java: ## @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.datasource.statements; + +import org.apache.flink.annotation.PublicEvolving; + +import java.io.Serializable; +import java.sql.PreparedStatement; +import java.sql.SQLException; + +/** + * Sets {@link PreparedStatement} parameters to use in JDBC Sink based on a specific type of record. Review Comment: How about ``` * Sets {@link PreparedStatement} parameters that's used in JDBC Sink based on a specified type of the record. ``` ? ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/datasource/transactions/xa/domain/TransactionId.java: ## @@ -0,0 +1,271 @@ +package org.apache.flink.connector.jdbc.datasource.transactions.xa.domain; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; + +import javax.transaction.xa.Xid; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Arrays; + +import static org.apache.flink.util.StringUtils.byteToHexString; + +/** A simple {@link Xid} implementation. */ +public class TransactionId implements Xid, Serializable { + +private static final long serialVersionUID = 1L; + +private static final int FORMAT_ID = 202; + +private final byte[] jobId; +private final int subtaskId; +private final int numberOfSubtasks; +private final long checkpointId; +private final int attempts; +private final boolean restored; + +private TransactionId( +byte[] jobId, +int subtaskId, +int numberOfSubtasks, +long checkpointId, +int attempts, +boolean restored) { +this.jobId = jobId; +
Re: [PR] [FLINK-20281][table] support consuming cdc stream about window aggregate [flink]
flinkbot commented on PR #24030: URL: https://github.com/apache/flink/pull/24030#issuecomment-1878444332 ## CI report: * fa4675861793c8cd42d7ece15334879b2ec54953 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-20281][table] support consuming cdc stream about window aggregate [flink]
xuyangzhong opened a new pull request, #24030: URL: https://github.com/apache/flink/pull/24030 ## What is the purpose of the change Currently, window aggregation doesn't support to consume a changelog stream. See more at [FLINK-20281](https://issues.apache.org/jira/browse/FLINK-20281) and [FLINK-27539](https://issues.apache.org/jira/browse/FLINK-27539) ## Brief change log *(for example:)* - *Update the logic about inferring ModifyKind and UpdateKind for window agg node* - *Introduce a count in window agg, and when there is no data in window, not output the agg result* - *Add tests for TUMBLE, HOP and CUMULATE window tvf to consume cdc source* ## Verifying this change Tests are added for TUMBLE, HOP and CUMULATE window tvf to consume cdc source ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? no need to update doc -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-20281) Window aggregation supports changelog stream input
[ https://issues.apache.org/jira/browse/FLINK-20281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-20281: --- Labels: auto-deprioritized-major auto-deprioritized-minor pull-request-available (was: auto-deprioritized-major auto-deprioritized-minor) > Window aggregation supports changelog stream input > -- > > Key: FLINK-20281 > URL: https://issues.apache.org/jira/browse/FLINK-20281 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Planner, Table SQL / Runtime >Reporter: Jark Wu >Assignee: xuyang >Priority: Major > Labels: auto-deprioritized-major, auto-deprioritized-minor, > pull-request-available > Attachments: screenshot-1.png > > > Currently, window aggregation doesn't support to consume a changelog stream. > This makes it impossible to do a window aggregation on changelog sources > (e.g. Kafka with Debezium format, or upsert-kafka, or mysql-cdc). -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33980][core] Reorganize job configuration [flink]
1996fanrui commented on code in PR #24025: URL: https://github.com/apache/flink/pull/24025#discussion_r1442706214 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java: ## @@ -453,7 +440,10 @@ public StreamExecutionEnvironment setBufferTimeout(long timeoutMillis) { throw new IllegalArgumentException("Timeout of buffer must be non-negative or -1"); } -this.bufferTimeout = timeoutMillis; +if (timeoutMillis == ExecutionOptions.DISABLED_NETWORK_BUFFER_TIMEOUT) { +this.configuration.set(ExecutionOptions.BUFFER_TIMEOUT_ENABLED, false); +} +this.configuration.set(ExecutionOptions.BUFFER_TIMEOUT, Duration.ofMillis(timeoutMillis)); return this; Review Comment: Thanks for the clarification. It's fine for me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33980][core] Reorganize job configuration [flink]
1996fanrui commented on code in PR #24025: URL: https://github.com/apache/flink/pull/24025#discussion_r1442704443 ## flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java: ## @@ -1201,6 +1212,9 @@ public void configure(ReadableConfig configuration, ClassLoader classLoader) { .getOptional(ExecutionOptions.SNAPSHOT_COMPRESSION) .ifPresent(this::setUseSnapshotCompression); RestartStrategies.fromConfiguration(configuration).ifPresent(this::setRestartStrategy); +configuration +.getOptional(RestartStrategyOptions.RESTART_STRATEGY) +.ifPresent(s -> this.setRestartStrategy(configuration)); Review Comment: I don't know why removing it. If it's removed, on one calls `private void setRestartStrategy(ReadableConfig configuration) {`, right? And all options cannot be added to configuration of ExecutionConfig. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28915] Flink Native k8s mode jar localtion support s3 schema. [flink]
mbalassi commented on PR #20779: URL: https://github.com/apache/flink/pull/20779#issuecomment-1878422305 Hi @SwimSweet, given your lack of response @ferenc-csaky offered to build on your work and take it forward based on my comments above. Hope that is OK, he is planning to post an updated PR next week. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-33856) Add metrics to monitor the interaction performance between task and external storage system in the process of checkpoint making
[ https://issues.apache.org/jira/browse/FLINK-33856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803484#comment-17803484 ] Rui Fan commented on FLINK-33856: - {quote}I would suggest that both of yo chat offline about the scope of the changes in Rui Fan's FLIP and/or eventual division of work. I'm not sure if Rui Fan plans to add per task/subtask spans for checkpoints and/or recovery. {quote} FLIP-412 focuses on adding the time-consuming span of each stage when starting the Flink job. It will be the job level instead subtask or task level according to my idea. > Add metrics to monitor the interaction performance between task and external > storage system in the process of checkpoint making > --- > > Key: FLINK-33856 > URL: https://issues.apache.org/jira/browse/FLINK-33856 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Affects Versions: 1.18.0 >Reporter: Jufang He >Assignee: Jufang He >Priority: Major > Labels: pull-request-available > > When Flink makes a checkpoint, the interaction performance with the external > file system has a great impact on the overall time-consuming. Therefore, it > is easy to observe the bottleneck point by adding performance indicators when > the task interacts with the external file storage system. These include: the > rate of file write , the latency to write the file, the latency to close the > file. > In flink side add the above metrics has the following advantages: convenient > statistical different task E2E time-consuming; do not need to distinguish the > type of external storage system, can be unified in the > FsCheckpointStreamFactory. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34001) doc of "Configure Operator-level State TTL" error
[ https://issues.apache.org/jira/browse/FLINK-34001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yong yang updated FLINK-34001: -- Summary: doc of "Configure Operator-level State TTL" error (was: doc diff from code) > doc of "Configure Operator-level State TTL" error > - > > Key: FLINK-34001 > URL: https://issues.apache.org/jira/browse/FLINK-34001 > Project: Flink > Issue Type: Bug > Components: API / State Processor >Affects Versions: 1.18.0 >Reporter: yong yang >Priority: Major > > doc: > https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/concepts/overview/#idle-state-retention-time > The current TTL value for both left and right side is {{{}"0 ms"{}}}, which > means the state retention is not enabled. > > but i test find : > The current TTL value for both left and right side is {{{}"0 ms"{}}}, which > means the state is permanence keep! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34001) doc diff from code
[ https://issues.apache.org/jira/browse/FLINK-34001?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803483#comment-17803483 ] yong yang commented on FLINK-34001: --- {code:java} //代码占位符 // plan1.json { "flinkVersion" : "1.18", "nodes" : [ { "id" : 7, "type" : "stream-exec-table-source-scan_1", "scanTableSource" : { "table" : { "identifier" : "`default_catalog`.`default_database`.`s1`", "resolvedTable" : { "schema" : { "columns" : [ { "name" : "id", "dataType" : "VARCHAR(2147483647)" }, { "name" : "name", "dataType" : "VARCHAR(2147483647)" } ], "watermarkSpecs" : [ ] }, "partitionKeys" : [ ], "options" : { "properties.bootstrap.servers" : "localhost:9092", "connector" : "kafka", "format" : "csv", "topic" : "s1", "properties.group.id" : "g1", "scan.startup.mode" : "latest-offset" } } } }, "outputType" : "ROW<`id` VARCHAR(2147483647), `name` VARCHAR(2147483647)>", "description" : "TableSourceScan(table=[[default_catalog, default_database, s1]], fields=[id, name])", "inputProperties" : [ ] }, { "id" : 8, "type" : "stream-exec-exchange_1", "inputProperties" : [ { "requiredDistribution" : { "type" : "HASH", "keys" : [ 0 ] }, "damBehavior" : "PIPELINED", "priority" : 0 } ], "outputType" : "ROW<`id` VARCHAR(2147483647), `name` VARCHAR(2147483647)>", "description" : "Exchange(distribution=[hash[id]])" }, { "id" : 9, "type" : "stream-exec-table-source-scan_1", "scanTableSource" : { "table" : { "identifier" : "`default_catalog`.`default_database`.`s2`", "resolvedTable" : { "schema" : { "columns" : [ { "name" : "id", "dataType" : "VARCHAR(2147483647) NOT NULL" }, { "name" : "age", "dataType" : "INT" } ], "watermarkSpecs" : [ ], "primaryKey" : { "name" : "PK_3386", "type" : "PRIMARY_KEY", "columns" : [ "id" ] } }, "partitionKeys" : [ ], "options" : { "properties.bootstrap.servers" : "localhost:9092", "key.format" : "csv", "topic" : "s3", "connector" : "upsert-kafka", "value.format" : "csv" } } } }, "outputType" : "ROW<`id` VARCHAR(2147483647) NOT NULL, `age` INT>", "description" : "TableSourceScan(table=[[default_catalog, default_database, s2]], fields=[id, age])", "inputProperties" : [ ] }, { "id" : 10, "type" : "stream-exec-exchange_1", "inputProperties" : [ { "requiredDistribution" : { "type" : "HASH", "keys" : [ 0 ] }, "damBehavior" : "PIPELINED", "priority" : 0 } ], "outputType" : "ROW<`id` VARCHAR(2147483647) NOT NULL, `age` INT>", "description" : "Exchange(distribution=[hash[id]])" }, { "id" : 11, "type" : "stream-exec-changelog-normalize_1", "configuration" : { "table.exec.mini-batch.enabled" : "false", "table.exec.mini-batch.size" : "-1" }, "uniqueKeys" : [ 0 ], "generateUpdateBefore" : true, "state" : [ { "index" : 0, "ttl" : "0 ms", "name" : "changelogNormalizeState" } ], "inputProperties" : [ { "requiredDistribution" : { "type" : "UNKNOWN" }, "damBehavior" : "PIPELINED", "priority" : 0 } ], "outputType" : "ROW<`id` VARCHAR(2147483647) NOT NULL, `age` INT>", "description" : "ChangelogNormalize(key=[id])" }, { "id" : 12, "type" : "stream-exec-exchange_1", "inputProperties" : [ { "requiredDistribution" : { "type" : "HASH", "keys" : [ 0 ] }, "damBehavior" : "PIPELINED", "priority" : 0 } ], "outputType" : "ROW<`id` VARCHAR(2147483647) NOT NULL, `age` INT>", "description" : "Exchange(distribution=[hash[id]])" }, { "id" : 13, "type" : "stream-exec-join_1", "joinSpec" : { "joinType" : "INNER", "leftKeys" : [ 0 ], "rightKeys" : [ 0 ], "filterNulls" : [ true ], "nonEquiCondition" : null }, "rightUpsertKeys" : [ [ 0 ] ], "state" : [ { "index" : 0, "ttl" : "0 ms", "name" : "leftState" }, { "index" : 1, "ttl" : "0 ms", "name" : "rightState" } ], "inputProperties" : [ { "requiredDistribution" : { "type" : "UNKNOWN" }, "damBehavior" : "PIPELINED", "priority" : 0 }, { "requiredDistribution" : { "type" : "UNKNOWN" }, "damBehavior" : "PIPELINED",
[jira] [Commented] (FLINK-34001) doc diff from code
[ https://issues.apache.org/jira/browse/FLINK-34001?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803482#comment-17803482 ] yong yang commented on FLINK-34001: --- {code:java} //代码占位符 package com.yy.state.OperatorStateTTL import org.apache.flink.configuration.{Configuration, RestOptions} import org.apache.flink.runtime.state.filesystem.FsStateBackend import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.environment.CheckpointConfig import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.bridge.scala.{StreamStatementSet, StreamTableEnvironment} import java.time.ZoneId import scala.util.Random /** * 看下 EXECUTE PLAN 和 statementset 怎么结合 * 适配flink web ui 通过打包为jar提交flink任务 报错: Cannot have more than one execute() or executeAsync() call in a single environment * idea执行没有问题 * 参考: https://blog.csdn.net/tianlangstudio/article/details/123086300 */ object TTLDemoV2 { def main(args: Array[String]): Unit = { val conf = new Configuration conf.setInteger(RestOptions.PORT, 28080) // 从指定的checkpoint启动 不配置则无状态启动 // conf.setString("execution.savepoint.path","file:///Users/thomas990p/checkpointdir/41f8644ad4492002188d8d4e1009/chk-136") // 不用local 使用下面的方式 就可以使用多个 execute sql 且都可以生效 val env = StreamExecutionEnvironment.getExecutionEnvironment // 本地任务停止时,保留 checkpoint 数据 env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) env.setParallelism(1) env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE) env.setStateBackend(new FsStateBackend("file:///Users/thomas990p/checkpointdir")) env.disableOperatorChaining() // 禁用全局任务链 val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env) // 指定国内时区 tEnv.getConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai")) tEnv.executeSql( """ |CREATE TABLE s1 ( | id String | ,name string |) WITH ( | 'connector' = 'kafka', | 'topic' = 's1', | 'scan.startup.mode' = 'latest-offset', | 'properties.group.id' = 'g1', | 'properties.bootstrap.servers' = 'localhost:9092', | 'format' = 'csv' |) |""".stripMargin) /* 不允许配置: 'scan.startup.mode' = 'latest-offset' 否则报错: Unsupported options found for 'upsert-kafka'. 他必须从earliest消费 无法修改 */ tEnv.executeSql( """ |CREATE TABLE s2 ( | id String | ,age int | ,primary key(id) not enforced |) WITH ( | 'connector' = 'upsert-kafka', | 'topic' = 's3', | 'properties.bootstrap.servers' = 'localhost:9092', | 'key.format' = 'csv' | ,'value.format' = 'csv' |) |""".stripMargin) tEnv.executeSql( """ |CREATE TABLE s1_sink1 ( | id String | ,name string |) WITH ( | 'connector' = 'print' | ,'print-identifier'='s1 sink' |) |""".stripMargin) tEnv.executeSql( """ |CREATE TABLE s2_sink1 ( | id String | ,age int |) WITH ( | 'connector' = 'print' | ,'print-identifier'='s2 sink' |) |""".stripMargin) tEnv.executeSql( """ |CREATE TABLE sink1 ( | id String | ,name string | ,age int |) WITH ( | 'connector' = 'print' | ,'print-identifier'='' |) |""".stripMargin) tEnv.executeSql("insert into s1_sink1 select * from s1") tEnv.executeSql("insert into s2_sink1 select * from s2") tEnv.executeSql("EXECUTE PLAN 'file:///Users/thomas990p/flink-plain/plan1.json' ") // two 0ms keep left and right state forever } } {code} > doc diff from code > -- > > Key: FLINK-34001 > URL: https://issues.apache.org/jira/browse/FLINK-34001 > Project: Flink > Issue Type: Bug > Components: API / State Processor >Affects Versions: 1.18.0 >Reporter: yong yang >Priority: Major > > doc: > https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/concepts/overview/#idle-state-retention-time > The current TTL value for both left and right side is {{{}"0 ms"{}}}, which > means the state retention is not enabled. > > but i test find : > The current TTL value for both left and right side is {{{}"0 ms"{}}}, which > means the state is permanence keep! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34001) doc diff from code
yong yang created FLINK-34001: - Summary: doc diff from code Key: FLINK-34001 URL: https://issues.apache.org/jira/browse/FLINK-34001 Project: Flink Issue Type: Bug Components: API / State Processor Affects Versions: 1.18.0 Reporter: yong yang doc: https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/concepts/overview/#idle-state-retention-time The current TTL value for both left and right side is {{{}"0 ms"{}}}, which means the state retention is not enabled. but i test find : The current TTL value for both left and right side is {{{}"0 ms"{}}}, which means the state is permanence keep! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-33993) Ineffective scaling detection events are misleading
[ https://issues.apache.org/jira/browse/FLINK-33993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels closed FLINK-33993. -- Resolution: Fixed > Ineffective scaling detection events are misleading > --- > > Key: FLINK-33993 > URL: https://issues.apache.org/jira/browse/FLINK-33993 > Project: Flink > Issue Type: Bug > Components: Autoscaler, Kubernetes Operator >Affects Versions: kubernetes-operator-1.7.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Minor > Labels: pull-request-available > Fix For: kubernetes-operator-1.8.0 > > > When the ineffective scaling decision feature is turned off, events are > regenerated which look like this: > {noformat} > Skipping further scale up after ineffective previous scale up for > 65c763af14a952c064c400d516c25529 > {noformat} > This is misleading because no action will be taken. It is fair to inform > users about ineffective scale up even when the feature is disabled but a > different message should be printed to convey that no action will be taken. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33993] Fix misleading events for scaling effectiveness detection [flink-kubernetes-operator]
mxm commented on PR #748: URL: https://github.com/apache/flink-kubernetes-operator/pull/748#issuecomment-1878408347 Thanks for the quick review Rui! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-33856) Add metrics to monitor the interaction performance between task and external storage system in the process of checkpoint making
[ https://issues.apache.org/jira/browse/FLINK-33856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803478#comment-17803478 ] Piotr Nowojski edited comment on FLINK-33856 at 1/5/24 9:58 AM: {quote} Maybe a new flip that supports task-level trace reporter can builded ? I’m willing to participate in the development. {quote} Please again check the FLIP-384 discussions. I was highlighting there a couple of difficulties: {quote} However, if we would like to create true distributed traces, with spans reported from many different components, potentially both on JM and TM, the problem is a bit deeper. The issue in that case is how to actually fill out `parrent_id` and `trace_id`? Passing some context entity as a java object would be unfeasible. That would require too many changes in too many places. I think the only realistic way to do it, would be to have a deterministic generator of `parten_id` and `trace_id` values. For example we could create the parent trace/span of the checkpoint on JM, and set those ids to something like: `jobId#attemptId#checkpointId`. Each subtask then could re-generate those ids and subtasks' checkpoint span would have an id of `jobId#attemptId#checkpointId#subTaskId`. Note that this is just an example, as most likely distributed spans for checkpointing do not make sense, as we can generate them much easier on the JM anyway. {quote} https://lists.apache.org/thread/7lql5f5q1np68fw1wc9trq3d9l2ox8f4 At the same time: {quote} I am worried that a large amount of data aggregation to JM may have performance problems. {quote} I wouldn't worry about that too much. This data is already aggregated on the JM from all of the TMs via {{CheckpointMetricsBuilder}} and {{CheckpointMetrics}}. Besides, it's just a single RPC from subtask -> JM per checkpoint. If that becomes a problem, we would have problems in many different areas as well (for example {{notifyCheckpointCompleted}} is a very similar call but the other direction). Also AFAIR there are/were different ideas how to solve this potential bottleneck in a more generic way (having multiple job coordinators in the cluster to spread the load). [~hejufang001] I would suggest that both of yo chat offline about the scope of the changes in [~fanrui]'s FLIP and/or eventual division of work. I'm not sure if [~fanrui] plans to add per task/subtask spans for checkpoints and/or recovery. was (Author: pnowojski): {quote} Maybe a new flip that supports task-level trace reporter can builded ? I’m willing to participate in the development. {quote} Please again check the FLIP-384 discussions. I was highlighting there a couple of difficulties: {quote} However, if we would like to create true distributed traces, with spans reported from many different components, potentially both on JM and TM, the problem is a bit deeper. The issue in that case is how to actually fill out `parrent_id` and `trace_id`? Passing some context entity as a java object would be unfeasible. That would require too many changes in too many places. I think the only realistic way to do it, would be to have a deterministic generator of `parten_id` and `trace_id` values. For example we could create the parent trace/span of the checkpoint on JM, and set those ids to something like: `jobId#attemptId#checkpointId`. Each subtask then could re-generate those ids and subtasks' checkpoint span would have an id of `jobId#attemptId#checkpointId#subTaskId`. Note that this is just an example, as most likely distributed spans for checkpointing do not make sense, as we can generate them much easier on the JM anyway. {quote} https://lists.apache.org/thread/7lql5f5q1np68fw1wc9trq3d9l2ox8f4 At the same time: {quote} I am worried that a large amount of data aggregation to JM may have performance problems. {quote} I wouldn't worry about that too much. This data is already aggregated on the JM from all of the TMs via {{CheckpointMetricsBuilder}} and {{CheckpointMetrics}}. Besides, it's just a single RPC from subtask -> JM per checkpoint. If that becomes a problem, we would have problems in many different areas as well (for example {{notifyCheckpointCompleted}} is a very similar call but the other direction). Also AFAIR there are/were different ideas how to solve this potential bottleneck in a more generic way (having multiple job coordinators in the cluster to spread the load). > Add metrics to monitor the interaction performance between task and external > storage system in the process of checkpoint making > --- > > Key: FLINK-33856 > URL: https://issues.apache.org/jira/browse/FLINK-33856 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Affects Versions:
Re: [PR] [FLINK-33993] Fix misleading events for scaling effectiveness detection [flink-kubernetes-operator]
mxm merged PR #748: URL: https://github.com/apache/flink-kubernetes-operator/pull/748 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-33856) Add metrics to monitor the interaction performance between task and external storage system in the process of checkpoint making
[ https://issues.apache.org/jira/browse/FLINK-33856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803478#comment-17803478 ] Piotr Nowojski edited comment on FLINK-33856 at 1/5/24 9:56 AM: {quote} Maybe a new flip that supports task-level trace reporter can builded ? I’m willing to participate in the development. {quote} Please again check the FLIP-384 discussions. I was highlighting there a couple of difficulties: {quote} However, if we would like to create true distributed traces, with spans reported from many different components, potentially both on JM and TM, the problem is a bit deeper. The issue in that case is how to actually fill out `parrent_id` and `trace_id`? Passing some context entity as a java object would be unfeasible. That would require too many changes in too many places. I think the only realistic way to do it, would be to have a deterministic generator of `parten_id` and `trace_id` values. For example we could create the parent trace/span of the checkpoint on JM, and set those ids to something like: `jobId#attemptId#checkpointId`. Each subtask then could re-generate those ids and subtasks' checkpoint span would have an id of `jobId#attemptId#checkpointId#subTaskId`. Note that this is just an example, as most likely distributed spans for checkpointing do not make sense, as we can generate them much easier on the JM anyway. {quote} https://lists.apache.org/thread/7lql5f5q1np68fw1wc9trq3d9l2ox8f4 At the same time: {quote} I am worried that a large amount of data aggregation to JM may have performance problems. {quote} I wouldn't worry about that too much. This data is already aggregated on the JM from all of the TMs via {{CheckpointMetricsBuilder}} and {{CheckpointMetrics}}. Besides, it's just a single RPC from subtask -> JM per checkpoint. If that becomes a problem, we would have problems in many different areas as well (for example {{notifyCheckpointCompleted}} is a very similar call but the other direction). Also AFAIR there are/were different ideas how to solve this potential bottleneck in a more generic way (having multiple job coordinators in the cluster to spread the load). was (Author: pnowojski): {quote} Maybe a new flip that supports task-level trace reporter can builded ? I’m willing to participate in the development. {quote} Please again check the FLIP-384 discussions. I was highlighting there a couple of difficulties: {quote} However, if we would like to create true distributed traces, with spans reported from many different components, potentially both on JM and TM, the problem is a bit deeper. The issue in that case is how to actually fill out `parrent_id` and `trace_id`? Passing some context entity as a java object would be unfeasible. That would require too many changes in too many places. I think the only realistic way to do it, would be to have a deterministic generator of `parten_id` and `trace_id` values. For example we could create the parent trace/span of the checkpoint on JM, and set those ids to something like: `jobId#attemptId#checkpointId`. Each subtask then could re-generate those ids and subtasks' checkpoint span would have an id of `jobId#attemptId#checkpointId#subTaskId`. Note that this is just an example, as most likely distributed spans for checkpointing do not make sense, as we can generate them much easier on the JM anyway. {quote} https://lists.apache.org/thread/7lql5f5q1np68fw1wc9trq3d9l2ox8f4 At the same time: {quote} I am worried that a large amount of data aggregation to JM may have performance problems. {quote} I wouldn't worry about that too much. Those data are already aggregated on the JM from all of the TMs via {{CheckpointMetricsBuilder}} and {{CheckpointMetrics}}. Besides, it's just a single RPC from subtask -> JM per checkpoint. If that becomes a problem, we would have problems in many different areas as well (for example {{notifyCheckpointCompleted}} is a very similar call but the other direction). Also AFAIR there are/were different ideas how to solve this potential bottleneck in a more generic way (having multiple job coordinators in the cluster to spread the load). > Add metrics to monitor the interaction performance between task and external > storage system in the process of checkpoint making > --- > > Key: FLINK-33856 > URL: https://issues.apache.org/jira/browse/FLINK-33856 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Affects Versions: 1.18.0 >Reporter: Jufang He >Assignee: Jufang He >Priority: Major > Labels: pull-request-available > > When Flink makes a checkpoint, the interaction performance with the external > file system
[jira] [Commented] (FLINK-33856) Add metrics to monitor the interaction performance between task and external storage system in the process of checkpoint making
[ https://issues.apache.org/jira/browse/FLINK-33856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803478#comment-17803478 ] Piotr Nowojski commented on FLINK-33856: {quote} Maybe a new flip that supports task-level trace reporter can builded ? I’m willing to participate in the development. {quote} Please again check the FLIP-384 discussions. I was highlighting there a couple of difficulties: {quote} However, if we would like to create true distributed traces, with spans reported from many different components, potentially both on JM and TM, the problem is a bit deeper. The issue in that case is how to actually fill out `parrent_id` and `trace_id`? Passing some context entity as a java object would be unfeasible. That would require too many changes in too many places. I think the only realistic way to do it, would be to have a deterministic generator of `parten_id` and `trace_id` values. For example we could create the parent trace/span of the checkpoint on JM, and set those ids to something like: `jobId#attemptId#checkpointId`. Each subtask then could re-generate those ids and subtasks' checkpoint span would have an id of `jobId#attemptId#checkpointId#subTaskId`. Note that this is just an example, as most likely distributed spans for checkpointing do not make sense, as we can generate them much easier on the JM anyway. {quote} https://lists.apache.org/thread/7lql5f5q1np68fw1wc9trq3d9l2ox8f4 At the same time: {quote} I am worried that a large amount of data aggregation to JM may have performance problems. {quote} I wouldn't worry about that too much. Those data are already aggregated on the JM from all of the TMs via {{CheckpointMetricsBuilder}} and {{CheckpointMetrics}}. Besides, it's just a single RPC from subtask -> JM per checkpoint. If that becomes a problem, we would have problems in many different areas as well (for example {{notifyCheckpointCompleted}} is a very similar call but the other direction). Also AFAIR there are/were different ideas how to solve this potential bottleneck in a more generic way (having multiple job coordinators in the cluster to spread the load). > Add metrics to monitor the interaction performance between task and external > storage system in the process of checkpoint making > --- > > Key: FLINK-33856 > URL: https://issues.apache.org/jira/browse/FLINK-33856 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Affects Versions: 1.18.0 >Reporter: Jufang He >Assignee: Jufang He >Priority: Major > Labels: pull-request-available > > When Flink makes a checkpoint, the interaction performance with the external > file system has a great impact on the overall time-consuming. Therefore, it > is easy to observe the bottleneck point by adding performance indicators when > the task interacts with the external file storage system. These include: the > rate of file write , the latency to write the file, the latency to close the > file. > In flink side add the above metrics has the following advantages: convenient > statistical different task E2E time-consuming; do not need to distinguish the > type of external storage system, can be unified in the > FsCheckpointStreamFactory. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33775] Report JobInitialization traces [flink]
StefanRRichter commented on code in PR #23908: URL: https://github.com/apache/flink/pull/23908#discussion_r1442679306 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java: ## @@ -730,57 +739,87 @@ void restoreInternal() throws Exception { getEnvironment().getMetricGroup().getIOMetricGroup().markTaskInitializationStarted(); LOG.debug("Initializing {}.", getName()); -operatorChain = - getEnvironment().getTaskStateManager().isTaskDeployedAsFinished() -? new FinishedOperatorChain<>(this, recordWriter) -: new RegularOperatorChain<>(this, recordWriter); -mainOperator = operatorChain.getMainOperator(); +SubTaskInitializationMetricsBuilder initializationMetrics = +new SubTaskInitializationMetricsBuilder( +SystemClock.getInstance().absoluteTimeMillis()); +try { +operatorChain = + getEnvironment().getTaskStateManager().isTaskDeployedAsFinished() +? new FinishedOperatorChain<>(this, recordWriter) +: new RegularOperatorChain<>(this, recordWriter); +mainOperator = operatorChain.getMainOperator(); -getEnvironment() -.getTaskStateManager() -.getRestoreCheckpointId() -.ifPresent(restoreId -> latestReportCheckpointId = restoreId); +getEnvironment() Review Comment: Argh, yes sorry reviewing in GH sometimes stinks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33980][core] Reorganize job configuration [flink]
JunRuiLee commented on PR #24025: URL: https://github.com/apache/flink/pull/24025#issuecomment-1878390963 Thanks @1996fanrui for the quick reviews, I've updated this pr, PTAL~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33775] Report JobInitialization traces [flink]
StefanRRichter commented on code in PR #23908: URL: https://github.com/apache/flink/pull/23908#discussion_r1442676217 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java: ## @@ -155,7 +167,12 @@ public CheckpointStatsSnapshot createSnapshot() { counts.createSnapshot(), summary.createSnapshot(), history.createSnapshot(), -latestRestoredCheckpoint); +jobInitializationMetricsBuilder +.map( +JobInitializationMetricsBuilder + ::buildRestoredCheckpointStats) +.orElse(Optional.empty()) +.orElse(null)); Review Comment: Ah I see, makes sense. You could probably rewrite it as flatMap().orElse() though... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33980][core] Reorganize job configuration [flink]
JunRuiLee commented on code in PR #24025: URL: https://github.com/apache/flink/pull/24025#discussion_r1442675488 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java: ## @@ -453,7 +440,10 @@ public StreamExecutionEnvironment setBufferTimeout(long timeoutMillis) { throw new IllegalArgumentException("Timeout of buffer must be non-negative or -1"); } -this.bufferTimeout = timeoutMillis; +if (timeoutMillis == ExecutionOptions.DISABLED_NETWORK_BUFFER_TIMEOUT) { +this.configuration.set(ExecutionOptions.BUFFER_TIMEOUT_ENABLED, false); +} +this.configuration.set(ExecutionOptions.BUFFER_TIMEOUT, Duration.ofMillis(timeoutMillis)); return this; Review Comment: In fact, the getter/setter API provides a user-friendly configuration method, and for me, I have not yet found a very clear and compelling reason for removing this API. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33905][core] Unify the Provision of Diverse Metadata for Context-like APIs [flink]
xintongsong commented on code in PR #23905: URL: https://github.com/apache/flink/pull/23905#discussion_r1442654920 ## pom.xml: ## @@ -2301,6 +2301,8 @@ under the License. org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#fromData(java.lang.Class,java.lang.Object[]) org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#fromData(java.util.Collection) org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#fromData(java.util.Collection,org.apache.flink.api.common.typeinfo.TypeInformation) + org.apache.flink.api.common.functions.RuntimeContext + org.apache.flink.api.connector.source.SourceReaderContext Review Comment: This should not be a hotfix commit. These change does not make sense by itself without the subsequent commits. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org