Re: [PR] [DRAFT][FLINK-34440][formats][protobuf-confluent] add support for protobuf-confluent [flink]
dmariassy commented on PR #24482: URL: https://github.com/apache/flink/pull/24482#issuecomment-2246151636 Draft PR for the alternative implementation: https://github.com/apache/flink/pull/25114 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [DRAFT][FLINK-34440][formats][protobuf-confluent] Protobuf confluent dynamic format [flink]
dmariassy opened a new pull request, #25114: URL: https://github.com/apache/flink/pull/25114 **This is a DRAFT PR**. ## TODO - Add missing boilerplate (e.g. config) - Add debezium support - Test the format in Shopify Flink jobs - Docs (might be a separate PR) ## What is the purpose of the change - Add support for deserializing protobuf messages using the Confluent wire format and whose schemas can be fetched from Confluent Schema Registry - Add support for serializing Flink records using the Confluent protobuf wire format ## Brief change log My intention was to: - Maintain parity with the existing flink-protobuf format's semantics in terms of the Flink -> Protobuf / Protobuf -> Flink conversions - Maximize code reuse between flink-protobuf-confluent and flink-protobuf formats ### Deserializer - Fetch the message's protobuf descriptor from the Confluent schema registry - Generate a java class from the descriptor at runtime - Deserialize `byte[]`s to the generated `protobuf.Message` type using a `io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer` - Delegate the work of converting between a `protobuf.Message` and a `RowData` object to the existing flink-protobuf format ### Serializer - Convert the user's `RowType` to a protobuf descriptor - Generate a java class from the descriptor at runtime - Delegate the `RowData` -> `AbstractMessage` conversion to the existing flink-protobuf format - Serialize the `AbstractMessage` object using a `io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer` ## Verifying this change Please make sure both new and modified tests in this PR follow [the conventions for tests defined in our code quality guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing). This change added tests and can be verified as follows: - Added comprehensive test coverage - Will shortly deploy to Shopify Flink clusters ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): yes (com.github.os72:protoc-jar) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: yes - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? docs to follow -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]
mateczagany commented on code in PR #821: URL: https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1688472333 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java: ## @@ -716,6 +695,48 @@ public CheckpointFetchResult fetchCheckpointInfo( } } +@Override +public CheckpointStatsResult fetchCheckpointStats( +String jobId, Long checkpointId, Configuration conf) { +try (RestClusterClient clusterClient = getClusterClient(conf)) { +var checkpointStatusHeaders = CheckpointStatisticDetailsHeaders.getInstance(); +var parameters = checkpointStatusHeaders.getUnresolvedMessageParameters(); +parameters.jobPathParameter.resolve(JobID.fromHexString(jobId)); + +// This was needed because the parameter is protected +var checkpointIdPathParameter = +(CheckpointIdPathParameter) Iterables.getLast(parameters.getPathParameters()); +checkpointIdPathParameter.resolve(checkpointId); + +var response = +clusterClient.sendRequest( +checkpointStatusHeaders, parameters, EmptyRequestBody.getInstance()); + +var stats = response.get(); +if (stats == null) { +throw new IllegalStateException("Checkpoint ID %d for job %s does not exist!"); +} else if (stats instanceof CheckpointStatistics.CompletedCheckpointStatistics) { +return CheckpointStatsResult.completed( +((CheckpointStatistics.CompletedCheckpointStatistics) stats) +.getExternalPath()); +} else if (stats instanceof CheckpointStatistics.FailedCheckpointStatistics) { +return CheckpointStatsResult.error( +((CheckpointStatistics.FailedCheckpointStatistics) stats) +.getFailureMessage()); +} else if (stats instanceof CheckpointStatistics.PendingCheckpointStatistics) { +return CheckpointStatsResult.pending(); +} else { +throw new IllegalArgumentException( +String.format( +"Unknown checkpoint statistics result class: %s", +stats.getClass().getSimpleName())); +} +} catch (Exception e) { +LOG.error("Exception while fetching checkpoint statistics", e); Review Comment: We should handle cases where the checkpoint statistics are no longer stored on the web server, and we get the following error: ``` Could not find checkpointing statistics for checkpoint 243. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]
mateczagany commented on code in PR #821: URL: https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1688463075 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/AuditUtils.java: ## @@ -53,6 +62,16 @@ private static String format(@NonNull CommonStatus status) { : status.getError()); } +private static String format(@NonNull FlinkStateSnapshotStatus status) { +if (StringUtils.isEmpty(status.getError())) { +return String.format( +">>> Status[Snapshot] | Info | %s | %s", status.getState(), status.getPath()); Review Comment: I have added some changes for this, thank you! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1688281873 ## flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsSinkBuilderTest.java: ## @@ -25,14 +25,6 @@ /** Covers construction, defaults and sanity checking of {@link SqsSinkBuilder}. */ class SqsSinkBuilderTest { -@Test -void elementConverterOfSinkMustBeSetWhenBuilt() { -Assertions.assertThatExceptionOfType(NullPointerException.class) -.isThrownBy(() -> SqsSink.builder().setSqsUrl("sqlUrl").build()) -.withMessageContaining( -"No SerializationSchema was supplied to the SQS Sink builder."); -} - Review Comment: because we added a default SerializationSchema if it is not provided by the customer so we never end up throwing this error -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35272][cdc][runtime] Pipeline Transform job supports omitting / renaming calculation column [flink-cdc]
yuxiqian commented on code in PR #3285: URL: https://github.com/apache/flink-cdc/pull/3285#discussion_r1688271544 ## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java: ## Review Comment: I modified expression evaluation logic here since previous version could not correctly handle expressions like `filter: a > 1 and a < 2` and generates duplicated argument names which would crash the operator. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35272][cdc][runtime] Pipeline Transform job supports omitting / renaming calculation column [flink-cdc]
yuxiqian commented on code in PR #3285: URL: https://github.com/apache/flink-cdc/pull/3285#discussion_r1688271544 ## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java: ## Review Comment: I modified expression evaluation logic here since previous version could not correctly handle expressions like `filter: a > 1 and a < 2` and generates duplicated argument names which would crash Janino compiler. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]
mateczagany commented on code in PR #821: URL: https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1688262282 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotContext.java: ## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.controller; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; +import org.apache.flink.kubernetes.operator.api.FlinkDeployment; +import org.apache.flink.kubernetes.operator.api.FlinkSessionJob; +import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot; +import org.apache.flink.kubernetes.operator.api.spec.JobKind; +import org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus; +import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; +import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration; + +import io.fabric8.kubernetes.client.KubernetesClient; +import io.javaoperatorsdk.operator.api.reconciler.Context; +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +import java.util.Optional; + +/** Context for reconciling a snapshot. */ +@Getter +@RequiredArgsConstructor +public class FlinkStateSnapshotContext { + +private final FlinkStateSnapshot resource; +private final FlinkStateSnapshotStatus originalStatus; +private final Context josdkContext; +private final FlinkConfigManager configManager; + +private FlinkOperatorConfiguration operatorConfig; +private Configuration referencedJobObserveConfig; +private FlinkDeployment referencedJobFlinkDeployment; + +/** + * @return Operator configuration for this resource. + */ +public FlinkOperatorConfiguration getOperatorConfig() { +if (operatorConfig != null) { +return operatorConfig; +} +return operatorConfig = +configManager.getOperatorConfiguration( +getResource().getMetadata().getNamespace(), null); Review Comment: Added lazy getter as well, thank you! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-22748][connector-kafka] Allow dynamic target topic selection in SQL Kafka sinks [flink-connector-kafka]
klam-shop commented on code in PR #109: URL: https://github.com/apache/flink-connector-kafka/pull/109#discussion_r1688236979 ## flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java: ## @@ -144,14 +147,34 @@ public void open( valueSerialization.open(context); } +private String getTargetTopic(RowData element) { +final String topic = readMetadata(element, KafkaDynamicSink.WritableMetadata.TOPIC); +if (topic == null && topics == null) { +throw new IllegalArgumentException( +"The topic of the sink record is not valid. Expected a single topic but no topic is set."); +} else if (topic == null && topics.size() == 1) { +return topics.get(0); +} else if (topics != null && topics.size() > 0 && !topics.contains(topic)) { Review Comment: Open to discuss: I decided to keep topics as a List since, but we can change it to HashSet. I am assuming the list will be short on average. List can outperform HashSet for small number of elements: https://stackoverflow.com/questions/150750/hashset-vs-list-performance -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-22748][connector-kafka] Allow dynamic target topic selection in SQL Kafka sinks [flink]
klam-shop commented on PR #16142: URL: https://github.com/apache/flink/pull/16142#issuecomment-2245500551 Hi I've taken some time to build upon [Nicholas Jiang](https://issues.apache.org/jira/secure/ViewProfile.jspa?name=nicholasjiang) 's PR and port it to the new kafka connector repo: https://github.com/apache/flink-connector-kafka/pull/109 Would appreciate feedback on this approach, let me know. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-22748][connector-kafka] Allow dynamic target topic selection in SQL Kafka sinks [flink-connector-kafka]
klam-shop opened a new pull request, #109: URL: https://github.com/apache/flink-connector-kafka/pull/109 ## What is the purpose of the change Allows writing to different Kafka topics based on the `topic` metadata column value in SQL, and updates the Table API's `KafkaDynamicSink` to accept a `List topics` instead of `String topic`. The list acts as a whitelist of acceptable values for the `topic` metadata column: - If a single topic is provided, it is used by default for the target topic to produce to - If a list is provided, only that list of topics can be produced to - If no list is provided, any value is accepted Builds on the work in https://github.com/apache/flink/pull/16142 by @SteNicholas ## Brief change log - Adds `topic` as writable metadata - Updates Table API Kafka sink to accept `List topics` instead of `String topic`, mirroring he source side. - Implements whitelist behaviour in `DynamicKafkaRecordSerializationSchema` ## Verifying this change - [x] Tests that the Sink Factory and related machinery works as expected - [x] Tests the various valid and invalid scenarios for `topic` metadata in `DynamicKafkaRecordSerializationSchema` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: yes - The runtime per-record code paths (performance sensitive): yes - Anything that affects deployment or recovery: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? updated documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]
mateczagany commented on code in PR #821: URL: https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1688221106 ## flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkStateSnapshotState.java: ## @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.api.status; + +import org.apache.flink.annotation.Experimental; + +/** Describes current snapshot state. */ +@Experimental +public enum FlinkStateSnapshotState { Review Comment: Good idea, thank you! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35272][cdc][runtime] Pipeline Transform job supports omitting / renaming calculation column [flink-cdc]
yuxiqian commented on PR #3285: URL: https://github.com/apache/flink-cdc/pull/3285#issuecomment-2245470485 Done, rebased due to some conflicts with https://github.com/apache/flink-cdc/commit/26ff6d2a081181f3df7aa49d65d804c57c634122. Will add `CAST ... AS` tests after #3357 got merged. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35857] fix redeploy failed deployment without latest checkpoint [flink-kubernetes-operator]
gyfora commented on code in PR #855: URL: https://github.com/apache/flink-kubernetes-operator/pull/855#discussion_r1688196270 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java: ## @@ -289,6 +289,8 @@ public boolean reconcileOtherChanges(FlinkResourceContext ctx) throws Except LOG.info("Stopping failed Flink job..."); cleanupAfterFailedJob(ctx); status.setError(null); +ReconciliationUtils.updateStatusForDeployedSpec( +ctx.getResource(), ctx.getDeployConfig(ctx.getResource().getSpec()), clock); Review Comment: Hm I see, I think you are right. I think the more straightforward and simpler fix would be to actually fix the `resubmitJob` method. In case where we have a savepoint it should set the upgradeMode to savepoint on the spec to recover. Otherwise it can leave it stateless. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35857] fix redeploy failed deployment without latest checkpoint [flink-kubernetes-operator]
chenyuzhi459 commented on code in PR #855: URL: https://github.com/apache/flink-kubernetes-operator/pull/855#discussion_r1688138227 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java: ## @@ -289,6 +289,8 @@ public boolean reconcileOtherChanges(FlinkResourceContext ctx) throws Except LOG.info("Stopping failed Flink job..."); cleanupAfterFailedJob(ctx); status.setError(null); +ReconciliationUtils.updateStatusForDeployedSpec( +ctx.getResource(), ctx.getDeployConfig(ctx.getResource().getSpec()), clock); Review Comment: Assume a flink deployment is submitted to the flink-kubernetes-operator for the first time with the following settings ``` spec.job.upgradeMode=savepoint spec.job.initialSavepointPath=null spec.flinkConfiguration.execution.checkpointing.interval=60s ``` Then I will share the startup and failover process of flink-kubernetes-operator based on my understanding: 1. At the first reconcile, in method [AbstractFlinkResourceReconciler.updateStatusBeforeFirstDeployment](https://github.com/apache/flink-kubernetes-operator/blob/29076c80eaac5547e3d12f703c43780cd4a52dad/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L224) , the `spec.job.upgradeMode` of cloned deployment will be set to STATELESS (this will not be updated synchronously to origin deployment in k8s, which means that the origin deployment's spec.job.upgradeMode is still savepoint) because `spec.job.initialSavepointPath` is empty, and will be serialized into `status.reconciliationStatus.lastReconciledSpec` (this step will be synchronously updated to the origin deployment in k8s, I haven't studied why yet will happen) 2. After running for a period of time, the deployment may encounters a problem and exit with failed status. The operator will save the latest checkpoint in the `status.jobStatus.savepointInfo.lastSavepoint` of the origin deployment in the method `SnapshotObserver.observeSavepointStatus`. 3. Then in the method [AbstractJobReconciler.resubmitJob](https://github.com/apache/flink-kubernetes-operator/blob/29076c80eaac5547e3d12f703c43780cd4a52dad/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java#L315), the lastReconciledSpec of the origin deployment will be read as specToRecover variable and passed to the method [AbstractJobReconciler.restore](https://github.com/apache/flink-kubernetes-operator/blob/29076c80eaac5547e3d12f703c43780cd4a52dad/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java#L260). In the method `AbstractJobReconciler.restore`, it will determine whether to recover from lastSavepoint based on whether the `spec.job.upgradeMode` of specToRecover variable is STATELESS . Before fixed, the updateMode here is obviously STATELESS. Therefore, in the faiover scenes, I think just serializing the origin deployment's `spec.job.upgradeMode=SAVEPOINT` to ` status.reconciliationStatus.lastReconciledSpec` before resubmitJob can solve this problem. I don’t know if there is something wrong with my understanding. If so, I hope you can correct me. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]
mateczagany commented on code in PR #821: URL: https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1688136504 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java: ## @@ -273,15 +280,19 @@ private void disposeSavepointQuietly( } } -private void observeLatestSavepoint( +private void observeLatestCheckpoint( Review Comment: Yep, this will be the latest checkpoint retrieved via Flink REST API, I have updated the log -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]
mateczagany commented on code in PR #821: URL: https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1688126716 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotController.java: ## @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.controller; + +import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot; +import org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus; +import org.apache.flink.kubernetes.operator.observer.snapshot.StateSnapshotObserver; +import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; +import org.apache.flink.kubernetes.operator.reconciler.snapshot.StateSnapshotReconciler; +import org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory; +import org.apache.flink.kubernetes.operator.utils.EventRecorder; +import org.apache.flink.kubernetes.operator.utils.EventSourceUtils; +import org.apache.flink.kubernetes.operator.utils.FlinkStateSnapshotUtils; +import org.apache.flink.kubernetes.operator.utils.StatusRecorder; +import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator; + +import io.javaoperatorsdk.operator.api.reconciler.Cleaner; +import io.javaoperatorsdk.operator.api.reconciler.Context; +import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration; +import io.javaoperatorsdk.operator.api.reconciler.DeleteControl; +import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler; +import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl; +import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; +import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer; +import io.javaoperatorsdk.operator.api.reconciler.Reconciler; +import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; +import io.javaoperatorsdk.operator.processing.event.source.EventSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +/** Controller that runs the main reconcile loop for {@link FlinkStateSnapshot}. */ +@ControllerConfiguration +public class FlinkStateSnapshotController +implements Reconciler, +ErrorStatusHandler, +EventSourceInitializer, +Cleaner { + +private static final Logger LOG = LoggerFactory.getLogger(FlinkStateSnapshotController.class); + +private final Set validators; +private final FlinkResourceContextFactory ctxFactory; +private final StateSnapshotReconciler reconciler; +private final StateSnapshotObserver observer; +private final EventRecorder eventRecorder; +private final StatusRecorder statusRecorder; + +public FlinkStateSnapshotController( +Set validators, +FlinkResourceContextFactory ctxFactory, +StateSnapshotReconciler reconciler, +StateSnapshotObserver observer, +EventRecorder eventRecorder, +StatusRecorder statusRecorder) { +this.validators = validators; +this.ctxFactory = ctxFactory; +this.reconciler = reconciler; +this.observer = observer; +this.eventRecorder = eventRecorder; +this.statusRecorder = statusRecorder; +} + +@Override +public UpdateControl reconcile( +FlinkStateSnapshot flinkStateSnapshot, Context josdkContext) { +// status might be null here +flinkStateSnapshot.setStatus( +Objects.requireNonNullElseGet( +flinkStateSnapshot.getStatus(), FlinkStateSnapshotStatus::new)); +var ctx = ctxFactory.getFlinkStateSnapshotContext(flinkStateSnapshot, josdkContext); + +observer.observe(ctx); + +if (validateSnapshot(ctx)) { +reconciler.reconcile(ctx); +} + +notifyListeners(ctx); +return getUpdateControl(ctx); +} + +@Override +public DeleteControl cleanup( +FlinkStateSnapshot flinkStateSnapshot, Context josdkContext) { +var ctx
Re: [PR] [FLINK-35292] Set dummy savepoint path during last-state upgrade [flink-kubernetes-operator]
gyfora commented on code in PR #849: URL: https://github.com/apache/flink-kubernetes-operator/pull/849#discussion_r1688103240 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java: ## @@ -265,7 +274,7 @@ protected void restoreJob( throws Exception { Optional savepointOpt = Optional.empty(); -if (spec.getJob().getUpgradeMode() != UpgradeMode.STATELESS) { +if (spec.getJob().getUpgradeMode() == UpgradeMode.SAVEPOINT) { Review Comment: The problem is that this cannot be easily done as what savepoint is passed there will depend on where / why the deploy is called. It is for example different during initial deployments vs upgrades. While the optional parameter is not great I would like to avoid adding more complexity to the deploy method. It may make sense to actually break up the deploy method in 3 parts, stateless/last-state/savepoint which would get rid of the optional params + the requireHaMetadata flag at the same time at the expense of more methods. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]
mateczagany commented on code in PR #821: URL: https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1688104509 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotContext.java: ## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.controller; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; +import org.apache.flink.kubernetes.operator.api.FlinkDeployment; +import org.apache.flink.kubernetes.operator.api.FlinkSessionJob; +import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot; +import org.apache.flink.kubernetes.operator.api.spec.JobKind; +import org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus; +import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; +import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration; + +import io.fabric8.kubernetes.client.KubernetesClient; +import io.javaoperatorsdk.operator.api.reconciler.Context; +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +import java.util.Optional; + +/** Context for reconciling a snapshot. */ +@Getter +@RequiredArgsConstructor +public class FlinkStateSnapshotContext { + +private final FlinkStateSnapshot resource; +private final FlinkStateSnapshotStatus originalStatus; +private final Context josdkContext; +private final FlinkConfigManager configManager; + +private FlinkOperatorConfiguration operatorConfig; +private Configuration referencedJobObserveConfig; +private FlinkDeployment referencedJobFlinkDeployment; + +/** + * @return Operator configuration for this resource. + */ +public FlinkOperatorConfiguration getOperatorConfig() { +if (operatorConfig != null) { +return operatorConfig; +} +return operatorConfig = +configManager.getOperatorConfiguration( +getResource().getMetadata().getNamespace(), null); Review Comment: I have flipped the ifs for now, I will come back to check using Lombok lazy getter -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]
mateczagany commented on code in PR #821: URL: https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1688100790 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java: ## @@ -426,6 +438,14 @@ public static String operatorConfigKey(String key) { .withDescription( "Max allowed checkpoint age for initiating last-state upgrades on running jobs. If a checkpoint is not available within the desired age (and nothing in progress) a savepoint will be triggered."); +@Documentation.Section(SECTION_DYNAMIC) +public static final ConfigOption OPERATOR_JOB_SAVEPOINT_DISPOSE_ON_DELETE = +operatorConfig("savepoint.dispose-on-delete") +.booleanType() +.defaultValue(false) +.withDescription( +"Savepoint data for FlinkStateSnapshot resources created by the operator during upgrades and periodic savepoints will be disposed of automatically when the generated Kubernetes resource is deleted."); Review Comment: I believe it's the correct way to say it (to dispose of something) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32682] Make it possible to use query time based time functions in streaming mode [flink]
dawidwys closed pull request #23083: [FLINK-32682] Make it possible to use query time based time functions in streaming mode URL: https://github.com/apache/flink/pull/23083 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]
mateczagany commented on code in PR #821: URL: https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1688087051 ## flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobSpec.java: ## @@ -90,8 +100,8 @@ public class JobSpec implements Diffable { /** * Nonce used to trigger a full redeployment of the job from the savepoint path specified in - * initialSavepointPath. In order to trigger redeployment, change the number to a different - * non-null value. Rollback is not possible after redeployment. + * initialSavepointPath or initialSavepointName. In order to trigger redeployment, change the Review Comment: Forgot to rename it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35776] Simplify job status handling [flink-kubernetes-operator]
gyfora merged PR #851: URL: https://github.com/apache/flink-kubernetes-operator/pull/851 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
hlteoh37 commented on PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#issuecomment-2245153099 @19priyadhingra The tests seems to be failing. Can we please take a look? Also - it would be good if we squash the commits! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35857] fix redeploy failed deployment without latest checkpoint [flink-kubernetes-operator]
gyfora commented on code in PR #855: URL: https://github.com/apache/flink-kubernetes-operator/pull/855#discussion_r1687973449 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java: ## @@ -289,6 +289,8 @@ public boolean reconcileOtherChanges(FlinkResourceContext ctx) throws Except LOG.info("Stopping failed Flink job..."); cleanupAfterFailedJob(ctx); status.setError(null); +ReconciliationUtils.updateStatusForDeployedSpec( +ctx.getResource(), ctx.getDeployConfig(ctx.getResource().getSpec()), clock); Review Comment: I don't really understand how this would solve the issue... Could you please explain why you think it solves it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35242] Supports per-SE type configuration & "lenient" evolution behavior [flink-cdc]
leonardBang commented on code in PR #3339: URL: https://github.com/apache/flink-cdc/pull/3339#discussion_r1687943729 ## flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/DataSink.java: ## @@ -30,5 +33,5 @@ public interface DataSink { EventSinkProvider getEventSinkProvider(); /** Get the {@link MetadataApplier} for applying metadata changes to external systems. */ -MetadataApplier getMetadataApplier(); +MetadataApplier getMetadataApplier(Set enabledEventTypes); Review Comment: This is a public API, we should keep the backward compatibility ## flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/MetadataApplier.java: ## @@ -19,13 +19,22 @@ import org.apache.flink.cdc.common.annotation.PublicEvolving; import org.apache.flink.cdc.common.event.SchemaChangeEvent; +import org.apache.flink.cdc.common.event.SchemaChangeEventType; +import org.apache.flink.cdc.common.exceptions.SchemaEvolveException; import java.io.Serializable; +import java.util.Set; /** {@code MetadataApplier} is used to apply metadata changes to external systems. */ @PublicEvolving public interface MetadataApplier extends Serializable { +/** Checks if this metadata applier should handle this event type. */ +boolean acceptsSchemaEvolutionType(SchemaChangeEventType schemaChangeEventType); + +/** Checks what kind of schema change events downstream can handle. */ +Set getSupportedSchemaEvolutionTypes(); + Review Comment: Please consider the compatibility when change existing public API. Imaging the case that user has a custom pipeline connector, is it still work after he/she bump the flink cdc version? ## flink-cdc-common/src/main/java/org/apache/flink/cdc/common/exceptions/SchemaEvolveException.java: ## @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.common.exceptions; + +import org.apache.flink.cdc.common.event.SchemaChangeEvent; + +import javax.annotation.Nullable; + +/** An exception occurred during schema evolution. */ +public class SchemaEvolveException extends Exception { Review Comment: extends `FlinkRuntimeException` or `CDCRuntimeException` ? ## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisDataSink.java: ## @@ -74,7 +76,7 @@ public EventSinkProvider getEventSinkProvider() { } @Override -public MetadataApplier getMetadataApplier() { -return new DorisMetadataApplier(dorisOptions, configuration); +public MetadataApplier getMetadataApplier(Set enabledEventTypes) { +return new DorisMetadataApplier(dorisOptions, configuration, enabledEventTypes); } Review Comment: You need change all connectors in one PR if you bring an incompatible change in public API, we should avoid this kind of change. ## flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/SchemaChangeBehavior.java: ## @@ -22,7 +22,9 @@ /** Behavior for handling schema changes. */ @PublicEvolving public enum SchemaChangeBehavior { -EVOLVE, IGNORE, Review Comment: you can add a note about the order adjustment ## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/metrics/SchemaOperatorMetrics.java: ## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions
[PR] [FLINK-35884] MySQL pipeline support snapshot chunk key-column [flink-cdc]
beryllw opened a new pull request, #3490: URL: https://github.com/apache/flink-cdc/pull/3490 flink-connector-mysql-cdc `MySqlSourceOptions` supports specifying the parameter scan.incremental.snapshot.chunk.key-column to divide chunks, pipeline connector should also support. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35868][cdc-connector][mongodb] Bump dependency version to support MongoDB 7.0 [flink-cdc]
yuxiqian commented on PR #3489: URL: https://github.com/apache/flink-cdc/pull/3489#issuecomment-2245079611 I'll investigate this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]
ferenc-csaky commented on code in PR #821: URL: https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1687799832 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java: ## @@ -426,6 +438,14 @@ public static String operatorConfigKey(String key) { .withDescription( "Max allowed checkpoint age for initiating last-state upgrades on running jobs. If a checkpoint is not available within the desired age (and nothing in progress) a savepoint will be triggered."); +@Documentation.Section(SECTION_DYNAMIC) +public static final ConfigOption OPERATOR_JOB_SAVEPOINT_DISPOSE_ON_DELETE = +operatorConfig("savepoint.dispose-on-delete") +.booleanType() +.defaultValue(false) +.withDescription( +"Savepoint data for FlinkStateSnapshot resources created by the operator during upgrades and periodic savepoints will be disposed of automatically when the generated Kubernetes resource is deleted."); Review Comment: nit: "...savepoints will be disposed ~of~ automatically when..." ## flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkStateSnapshotState.java: ## @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.api.status; + +import org.apache.flink.annotation.Experimental; + +/** Describes current snapshot state. */ +@Experimental +public enum FlinkStateSnapshotState { Review Comment: Maybe move this inside the `FlinkStateSnapshotStatus`, and then the enum name could be simply `State`. This might improve the overall readability and we can work around the kind of confusing wording here a bit better. I see that `ReconciliationStatus` and `ReconciliationState` follows the current setup and this just setup the same way, but the wording is straightforward there. ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java: ## @@ -297,6 +297,18 @@ public static String operatorConfigKey(String key) { "Custom HTTP header for HttpArtifactFetcher. The header will be applied when getting the session job artifacts. " + "Expected format: headerKey1:headerValue1,headerKey2:headerValue2."); +@Documentation.Section(SECTION_DYNAMIC) +public static final ConfigOption SNAPSHOT_RESOURCE_ENABLED = +operatorConfig("snapshot.resource.enabled") +.booleanType() +.defaultValue(true) +.withDescription( +"Create new FlinkStateSnapshot resources for storing snapshots. " ++ "Disable if you wish to use the deprecated mode and save snapshot results to " ++ "FlinkDeployment/FlinkSessionJob status fields. The Operator will fallback to the " Review Comment: nit: "The Operator will fallback to ~the~" ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotController.java: ## @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +
Re: [PR] [FLINK-35868][cdc-connector][mongodb] Bump dependency version to support MongoDB 7.0 [flink-cdc]
leonardBang commented on code in PR #3489: URL: https://github.com/apache/flink-cdc/pull/3489#discussion_r1687920162 ## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/pom.xml: ## @@ -69,7 +69,7 @@ limitations under the License. org.mongodb mongodb-driver-sync -4.9.1 +5.1.1 Review Comment: Could you also update the docs ? and the older(release-3.1) version docs seem use an incorrect driver version. https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/overview/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35272][cdc][runtime] Pipeline Transform job supports omitting / renaming calculation column [flink-cdc]
leonardBang commented on code in PR #3285: URL: https://github.com/apache/flink-cdc/pull/3285#discussion_r1687881675 ## flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/ddl/data_types_test.sql: ## @@ -0,0 +1,28 @@ +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +DROP TABLE IF EXISTS DATA_TYPES_TABLE; + +CREATE TABLE DATA_TYPES_TABLE ( +ID INT NOT NULL, +TS DATETIME(0), +NUM DECIMAL(10, 2), +PRIMARY KEY (ID) +); Review Comment: The type is not so complex from my point, could you refer `fullTypesTest` of MySQL CDC Source? ## flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java: ## @@ -56,6 +67,181 @@ public static List createFieldGetters(List colum return fieldGetters; } +/** Restore original data fields from RecordData structure. */ +public static List restoreOriginalData( +@Nullable RecordData recordData, List fieldGetters) { +if (recordData == null) { +return Collections.emptyList(); +} +List actualFields = new ArrayList<>(); +for (RecordData.FieldGetter fieldGetter : fieldGetters) { +actualFields.add(fieldGetter.getFieldOrNull(recordData)); +} +return actualFields; +} + +/** Merge compatible upstream schemas. */ +public static Schema mergeCompatibleSchemas(List schemas) { +if (schemas.isEmpty()) { +return null; +} else if (schemas.size() == 1) { +return schemas.get(0); +} else { +Schema outputSchema = null; +for (Schema schema : schemas) { +outputSchema = mergeSchema(outputSchema, schema); +} +return outputSchema; +} +} + +/** Try to combine two schemas with potential incompatible type. */ +@VisibleForTesting +public static Schema mergeSchema(@Nullable Schema lSchema, Schema rSchema) { +if (lSchema == null) { +return rSchema; +} +if (lSchema.getColumnCount() != rSchema.getColumnCount()) { +throw new IllegalStateException( +String.format( +"Unable to merge schema %s and %s with different column counts.", +lSchema, rSchema)); +} +if (!lSchema.primaryKeys().equals(rSchema.primaryKeys())) { +throw new IllegalStateException( +String.format( +"Unable to merge schema %s and %s with different primary keys.", +lSchema, rSchema)); +} +if (!lSchema.partitionKeys().equals(rSchema.partitionKeys())) { +throw new IllegalStateException( +String.format( +"Unable to merge schema %s and %s with different partition keys.", +lSchema, rSchema)); +} +if (!lSchema.options().equals(rSchema.options())) { +throw new IllegalStateException( +String.format( +"Unable to merge schema %s and %s with different options.", +lSchema, rSchema)); +} +if (!Objects.equals(lSchema.comment(), rSchema.comment())) { +throw new IllegalStateException( +String.format( +"Unable to merge schema %s and %s with different comments.", +lSchema, rSchema)); +} + +List leftColumns = lSchema.getColumns(); +List rightColumns = rSchema.getColumns(); + +List mergedColumns = +IntStream.range(0, lSchema.getColumnCount()) +.mapToObj(i -> mergeColumn(leftColumns.get(i), rightColumns.get(i))) +.collect(Collectors.toList()); + +return lSchema.copy(mergedColumns); +} + +/** Try to combine two columns with potential incompatible type. */ +@VisibleForTesting +public static Column mergeColumn(Column lColumn, Column rColumn) { +if (!Objects.equals(lColumn.getName(), rColumn.getName())) { +
Re: [PR] [FLINK-33874][runtime] Support resource request wait mechanism at DefaultDeclarativeSlotPool side for Default Scheduler [flink]
flinkbot commented on PR #25113: URL: https://github.com/apache/flink/pull/25113#issuecomment-2244834549 ## CI report: * 7e539f5fc908f38eebba16f9480bcf3f5d5009c3 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-33874][runtime] Support resource request wait mechanism at DefaultDeclarativeSlotPool side for Default Scheduler [flink]
RocMarshal opened a new pull request, #25113: URL: https://github.com/apache/flink/pull/25113 ## What is the purpose of the change Support resource request wait mechanism at DefaultDeclarativeSlotPool side for Default Scheduler ## Brief change log - introduce slot.request.max-interval - implement the mechanism. ## Verifying this change This change is already covered by added corresponding test cases. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (not applicable / **docs** / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35874][cdc-connector][mysql] Check pureBinlogPhaseTables set before call getBinlogPosition method [flink-cdc]
qiaozongmi commented on PR #3488: URL: https://github.com/apache/flink-cdc/pull/3488#issuecomment-2244820245 @leonardBang PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35877] Shade protobuf in flink [flink]
zhuanshenbsj1 commented on PR #25112: URL: https://github.com/apache/flink/pull/25112#issuecomment-2244784197 > Which part of the Flink runtime/system relies on Protobuf, that would justify shading Protobuf? This is mainly to avoid the dependency conflict between the protobuf used in the developer's code and flink itself. protobuf is very commonly used in development. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
1996fanrui commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1687687114 ## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategyTest.java: ## @@ -69,46 +69,29 @@ class LocalInputPreferredSlotSharingStrategyTest { private TestingSchedulingExecutionVertex ev21; private TestingSchedulingExecutionVertex ev22; -private Set slotSharingGroups; - -@BeforeEach -void setUp() { -topology = new TestingSchedulingTopology(); - +private void setupCase() { ev11 = topology.newExecutionVertex(JOB_VERTEX_ID_1, 0); ev12 = topology.newExecutionVertex(JOB_VERTEX_ID_1, 1); ev21 = topology.newExecutionVertex(JOB_VERTEX_ID_2, 0); ev22 = topology.newExecutionVertex(JOB_VERTEX_ID_2, 1); -final SlotSharingGroup slotSharingGroup = new SlotSharingGroup(); -slotSharingGroup.addVertexToGroup(JOB_VERTEX_ID_1); -slotSharingGroup.addVertexToGroup(JOB_VERTEX_ID_2); -slotSharingGroups = Collections.singleton(slotSharingGroup); +slotsharingGroup.addVertexToGroup(JOB_VERTEX_ID_1); +slotsharingGroup.addVertexToGroup(JOB_VERTEX_ID_2); } -@Test -void testCoLocationConstraintIsRespected() { -topology.connect(ev11, ev22); -topology.connect(ev12, ev21); - -final CoLocationGroup coLocationGroup = -new TestingCoLocationGroup(JOB_VERTEX_ID_1, JOB_VERTEX_ID_2); -final Set coLocationGroups = Collections.singleton(coLocationGroup); - -final SlotSharingStrategy strategy = -new LocalInputPreferredSlotSharingStrategy( -topology, slotSharingGroups, coLocationGroups); - -assertThat(strategy.getExecutionSlotSharingGroups()).hasSize(2); - assertThat(strategy.getExecutionSlotSharingGroup(ev11.getId()).getExecutionVertexIds()) -.contains(ev11.getId(), ev21.getId()); - assertThat(strategy.getExecutionSlotSharingGroup(ev12.getId()).getExecutionVertexIds()) -.contains(ev12.getId(), ev22.getId()); +@Override +protected SlotSharingStrategy getSlotSharingStrategy( +SchedulingTopology topology, +Set slotSharingGroups, +Set coLocationGroups) { +return new LocalInputPreferredSlotSharingStrategy( +topology, slotSharingGroups, coLocationGroups); } @Test void testInputLocalityIsRespectedWithRescaleEdge() { +setupCase(); Review Comment: It's a little wired to call `setupCase` in too many tests. Could we define a `protected abstract void doSetup();` in `AbstractSlotSharingStrategyTest` and `setUp` call `doSetup()`? ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategy.java: ## @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex; +import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This strategy tries to get a balanced tasks scheduling. Execution vertices, which are belong to + * the same SlotSharingGroup, tend to be put evenly in each ExecutionSlotSharingGroup. Co-location + *
Re: [PR] [FLINK-35623] Bump mongo-driver version from 4.7.2 to 5.1.1 to support MongoDB 7.0 [flink-connector-mongodb]
Jiabao-Sun merged PR #36: URL: https://github.com/apache/flink-connector-mongodb/pull/36 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35868][cdc-connector][mongodb] Bump dependency version to support MongoDB 7.0 [flink-cdc]
yuxiqian commented on PR #3489: URL: https://github.com/apache/flink-cdc/pull/3489#issuecomment-2244705856 > Shall we bump the driver version from 4.7.1 to 5.1.1 as well? Done, bumped `mongo-kafka` version, too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35868][cdc-connector][mongodb] Add MongoDB 6.0 & 7.0 tests [flink-cdc]
Jiabao-Sun commented on PR #3489: URL: https://github.com/apache/flink-cdc/pull/3489#issuecomment-2244672599 Shall we bump the driver version from 4.7.1 to 5.1.1 as well? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35623] Bump mongo-driver version from 4.7.2 to 5.1.1 to support MongoDB 7.0 [flink-connector-mongodb]
Jiabao-Sun commented on PR #36: URL: https://github.com/apache/flink-connector-mongodb/pull/36#issuecomment-2244667146 Hi @yux, could you help review this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35737] Prevent Memory Leak by Closing MemoryExecutionGraphInfoStore on MiniCluster Shutdown [flink]
fengjiajie commented on PR #25009: URL: https://github.com/apache/flink/pull/25009#issuecomment-2244648203 Hi @Samrat002, I was wondering if you had a chance to take another look at this PR. Please let me know if any further changes are needed. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35877] Shade protobuf in flink [flink]
flinkbot commented on PR #25112: URL: https://github.com/apache/flink/pull/25112#issuecomment-2244623653 ## CI report: * 8a98ca6989b8a1192f72c4d0fd58529e99f9bfd5 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
hlteoh37 commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1687672586 ## flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsSinkBuilderTest.java: ## @@ -25,14 +25,6 @@ /** Covers construction, defaults and sanity checking of {@link SqsSinkBuilder}. */ class SqsSinkBuilderTest { -@Test -void elementConverterOfSinkMustBeSetWhenBuilt() { -Assertions.assertThatExceptionOfType(NullPointerException.class) -.isThrownBy(() -> SqsSink.builder().setSqsUrl("sqlUrl").build()) -.withMessageContaining( -"No SerializationSchema was supplied to the SQS Sink builder."); -} - Review Comment: Why did we remove this unit test? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-35877] Shade protobuf in flink [flink]
zhuanshenbsj1 opened a new pull request, #25112: URL: https://github.com/apache/flink/pull/25112 ## What is the purpose of the change Shade the classes in protobuf to avoid class conflict. ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change Please make sure both new and modified tests in this PR follow [the conventions for tests defined in our code quality guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing). *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-35868][mongo] Add MongoDB 6.0 & 7.0 tests [flink-cdc]
yuxiqian opened a new pull request, #3489: URL: https://github.com/apache/flink-cdc/pull/3489 This closes FLINK-35868. It allows Mongo CDC test cases running on 6.0.16 & 7.0.12 (and legacy 5.0.2). Notice: since JUnit doesn't allow parameterized `@ClassRule` or static fields, so containers must be created / destroyed before / after each test case, which significantly slows down the. testing (~50 min.). More discussion is required on this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35868][mongo] Add MongoDB 6.0 & 7.0 tests [flink-cdc]
yuxiqian commented on PR #3489: URL: https://github.com/apache/flink-cdc/pull/3489#issuecomment-2244493624 @leonardBang @Jiabao-Sun PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-27355][runtime] Unregister JobManagerRunner after it's closed [flink]
XComp commented on code in PR #25027: URL: https://github.com/apache/flink/pull/25027#discussion_r1687470490 ## flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java: ## @@ -83,9 +83,15 @@ public Collection getJobManagerRunners() { } @Override -public CompletableFuture localCleanupAsync(JobID jobId, Executor unusedExecutor) { +public CompletableFuture localCleanupAsync(JobID jobId, Executor mainThreadExecutor) { if (isRegistered(jobId)) { -return unregister(jobId).closeAsync(); +CompletableFuture resultFuture = this.jobManagerRunners.get(jobId).closeAsync(); + +return resultFuture.thenApply( Review Comment: ```suggestion return resultFuture.thenApplyAsync( ``` ## flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/TestingResourceCleanerFactory.java: ## @@ -30,16 +30,17 @@ /** {@code TestingResourceCleanerFactory} for adding custom {@link ResourceCleaner} creation. */ public class TestingResourceCleanerFactory implements ResourceCleanerFactory { -private final Collection locallyCleanableResources; +private final Collection +locallyCleanableInMainThreadResource; private final Collection globallyCleanableResources; private final Executor cleanupExecutor; private TestingResourceCleanerFactory( -Collection locallyCleanableResources, +Collection locallyCleanableInMainThreadResource, Review Comment: `LocallyCleanableInMainThreadResource` is an implementation detail of the `DispatcherResourceCleanerFactory`. We don't have to use it in `TestingResourceCleanerFactory`. ## flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java: ## Review Comment: I'm wondering whether you could come up with a test where we verify that the close is called in another thread and the unregister is then executed in the main thread. ## flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/LocallyCleanableInMainThreadResource.java: ## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher.cleanup; + +import org.apache.flink.api.common.JobID; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * {@code LocallyCleanableInMainThreadResource} is supposed to be implemented by any class that + * provides artifacts for a given job that need to be cleaned up in the main thread after the job + * reached a local terminal state. Local cleanups that needs to be triggered for a global terminal + * state as well, need to be implemented using the {@link GloballyCleanableResource}. + * + * The {@link DispatcherResourceCleanerFactory} provides a workaround to trigger some {@code + * LocallyCleanableInMainThreadResource} as globally cleanable. FLINK-26175 is created to cover a + * refactoring and straighten things out. + * + * @see org.apache.flink.api.common.JobStatus + */ +@FunctionalInterface +public interface LocallyCleanableInMainThreadResource { + +/** + * {@code localCleanupAsync} is expected to be called from the main thread and uses the passed + * {@code mainThreadExecutor} for cleanups. Thread-safety must be ensured. + * + * @param jobId The {@link JobID} of the job for which the local data should be cleaned up. + * @param mainThreadExecutor The main thread executor. + * @return The cleanup result future. + */ +CompletableFuture localCleanupAsync(JobID jobId, Executor mainThreadExecutor); Review Comment: We shouldn't remove the `ioExecutor` from the signature because we still need to translate `LocallyCleanableInMainThreadResource` into `LocallyCleanableResource` (see the proposal in [my comment](https://github.com/apache/flink/pull/25027#discussion_r1683903323)). ## flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java: ## @@ -126,8 +129,24 @@ JOB_MANAGER_METRIC_GROUP_LABEL,
Re: [PR] [BP-1.20][FLINK-35095][test] Fix unstable tests in `ExecutionEnvironmentImplTest` [flink]
flinkbot commented on PR #25111: URL: https://github.com/apache/flink/pull/25111#issuecomment-2244299273 ## CI report: * 599839b334cb52e6159912ccb2162e314ce38198 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [BP-1.20]][FLINK-35095][test] Fix unstable tests in `ExecutionEnvironmentImplTest` [flink]
reswqa opened a new pull request, #25111: URL: https://github.com/apache/flink/pull/25111 (no comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-23598] [core] Fix a bug that caused position to be updated twice when writing a string to a byte array [flink]
sn-12-3 commented on PR #23563: URL: https://github.com/apache/flink/pull/23563#issuecomment-2244234183 Hello, could someone please clarify if this PR requires any further modifications as the PR is open from quite some time now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [cdc-common] add field of defaultValue to Column. [flink-cdc]
leonardBang commented on PR #2944: URL: https://github.com/apache/flink-cdc/pull/2944#issuecomment-2244218226 @lvyanquan Could you check the failed tests? and it's recommended to rebase master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35713][cdc-compose] Add sink PARALLELISM for flink-cdc. [flink-cdc]
ruanhang1993 commented on code in PR #3438: URL: https://github.com/apache/flink-cdc/pull/3438#discussion_r1687392535 ## flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java: ## @@ -93,6 +93,7 @@ private FlinkPipelineComposer(StreamExecutionEnvironment env, boolean isBlocking @Override public PipelineExecution compose(PipelineDef pipelineDef) { int parallelism = pipelineDef.getConfig().get(PipelineOptions.PIPELINE_PARALLELISM); +int sinkParallelism = pipelineDef.getConfig().get(PipelineOptions.SINK_PARALLELISM); Review Comment: What if users do not set the SINK_PARALLELISM setting? ## flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java: ## @@ -45,6 +45,15 @@ public class PipelineOptions { .noDefaultValue() .withDescription("Parallelism of the pipeline"); + + +public static final ConfigOption SINK_PARALLELISM = +ConfigOptions.key("sink.parallelism") +.intType() +.noDefaultValue() +.withDescription("Parallelism of the sink in the pipeline"); Review Comment: Please add some tests for the setting. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-31931][runtime] Prevent exception history page from linking to non-existent TM page. [flink]
JunRuiLee closed pull request #22542: [FLINK-31931][runtime] Prevent exception history page from linking to non-existent TM page. URL: https://github.com/apache/flink/pull/22542 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [minor][cdc-connector][oracle] OracleSchema getTableSchema Description Modified [flink-cdc]
leonardBang merged PR #3443: URL: https://github.com/apache/flink-cdc/pull/3443 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35766][streaming] Fix hang issue in JobGraph generation when StreamGraph contains many YieldingOperatorFactory. [flink]
flinkbot commented on PR #25110: URL: https://github.com/apache/flink/pull/25110#issuecomment-2244169719 ## CI report: * e83bc90e32841a76be85d96d25e1098e9276c50a UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-35766][streaming] Fix hang issue in JobGraph generation when StreamGraph contains many YieldingOperatorFactory. [flink]
JunRuiLee opened a new pull request, #25110: URL: https://github.com/apache/flink/pull/25110 ## What is the purpose of the change Fix hang issue in JobGraph generation when StreamGraph contains many YieldingOperatorFactory. ## Brief change log Add a cache to store the mapping between StreamNode and its head operator ## Verifying this change This change added tests and can be verified by StreamingJobGraphGeneratorTest#testJobGraphGenerationWithManyYieldingOperatorsDoesNotHang. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35674][cdc-connector][mysql]Fix blocking caused by searching for timestamp in binlog file [flink-cdc]
shiyiky commented on code in PR #3432: URL: https://github.com/apache/flink-cdc/pull/3432#discussion_r1687363640 ## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java: ## @@ -240,12 +240,15 @@ private static Map querySystemVariables( return variables; } -public static BinlogOffset findBinlogOffset(long targetMs, MySqlConnection connection) { +public static BinlogOffset findBinlogOffset( +long targetMs, MySqlConnection connection, MySqlSourceConfig mySqlSourceConfig) { Review Comment: At first, I did think so, but considering that this is a method inside the util attribute, there may be some changes later, and the attributes carried by the config class will create more in the future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35674][cdc-connector][mysql]Fix blocking caused by searching for timestamp in binlog file [flink-cdc]
whhe commented on code in PR #3432: URL: https://github.com/apache/flink-cdc/pull/3432#discussion_r1687359553 ## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java: ## @@ -240,12 +240,15 @@ private static Map querySystemVariables( return variables; } -public static BinlogOffset findBinlogOffset(long targetMs, MySqlConnection connection) { +public static BinlogOffset findBinlogOffset( +long targetMs, MySqlConnection connection, MySqlSourceConfig mySqlSourceConfig) { Review Comment: How about passing the serverId directly? In this way the method signature is also clearer and easier to understand. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35865][base] Support Byte and Short in ObjectUtils [flink-cdc]
GOODBOY008 merged PR #3481: URL: https://github.com/apache/flink-cdc/pull/3481 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [cdc-runtime]SchemaRegistry Log output is repeated [flink-cdc]
ChengJie1053 commented on PR #3487: URL: https://github.com/apache/flink-cdc/pull/3487#issuecomment-2244140620 @PatrickRen Hi, can you take a look at this code for me -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35674][cdc-connector][mysql]Fix blocking caused by searching for timestamp in binlog file [flink-cdc]
whhe commented on PR #3432: URL: https://github.com/apache/flink-cdc/pull/3432#issuecomment-2244137919 > May be we should consider fix the BinaryLogClient with LifecycleListener in a new pr ,because another pr also should be fixed https://github.com/apache/flink-cdc/pull/1915/ +1, creating a new pr for it makes sense to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35363] FLIP-449: Reorganization of flink-connector-jdbc [flink-connector-jdbc]
RocMarshal commented on PR #123: URL: https://github.com/apache/flink-connector-jdbc/pull/123#issuecomment-2244122538 > Note: Before the merge, we can consider removing the backward-compatibility module (and workflow) to avoid generating the jar, as it was only meant to be a helper. Hi, @eskabetxe Considering that this is a relatively large refactoring action, Perhaps retaining this module can also help developers quickly verify and fix compatibility related issues. How about continuing to keep one or two versions of the cycle? In other words, keep this module until the refactored code becomes stable and mature. Please let me know what's your opinion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix][docs] Add missing "Submitting a Flink job" section in Chinese version doc and other fixes [flink]
showuon commented on PR #24952: URL: https://github.com/apache/flink/pull/24952#issuecomment-2244119347 The CI test passed. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix] [docs] Fix java doc api [flink]
reswqa merged PR #25101: URL: https://github.com/apache/flink/pull/25101 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix] [base] Fix the serialization error occurring with table names containing dot [flink-cdc]
leonardBang commented on code in PR #2705: URL: https://github.com/apache/flink-cdc/pull/2705#discussion_r1687318951 ## flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/split/SourceSplitSerializer.java: ## @@ -199,7 +199,7 @@ public static void writeTableSchemas( boolean useCatalogBeforeSchema = SerializerUtils.shouldUseCatalogBeforeSchema(entry.getKey()); out.writeBoolean(useCatalogBeforeSchema); -out.writeUTF(entry.getKey().toString()); +out.writeUTF(entry.getKey().toDoubleQuotedString()); Review Comment: @edmond-kk @loserwang1024 Is this a compatible change? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [cdc-runtime]SchemaRegistry Log output is repeated [flink-cdc]
ChengJie1053 opened a new pull request, #3487: URL: https://github.com/apache/flink-cdc/pull/3487 ![image](https://github.com/user-attachments/assets/0d8bbbf7-98d1-4ca2-896b-f0f27bc074ff) ![image](https://github.com/user-attachments/assets/60ba14d1-09e2-4cb8-b648-a857c76600c3) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35674][cdc-connector][mysql]Fix blocking caused by searching for timestamp in binlog file [flink-cdc]
shiyiky commented on PR #3432: URL: https://github.com/apache/flink-cdc/pull/3432#issuecomment-2244093727 May be we should consider fix the BinaryLogClient with LifecycleListener in a new pr ,because another pr also should be fixed https://github.com/apache/flink-cdc/pull/1915/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35674][cdc-connector][mysql]Fix blocking caused by searching for timestamp in binlog file [flink-cdc]
shiyiky commented on PR #3432: URL: https://github.com/apache/flink-cdc/pull/3432#issuecomment-2244085865 > I think adding a server id can indeed solve the issue, and a LifecycleListener is also needed for other exceptions. Can you add a basic implementation like `ReaderThreadLifecycleListener` please? where i can add a basic implementation like `ReaderThreadLifecycleListener` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix] [base] Fix the serialization error occurring with table names containing dot [flink-cdc]
github-actions[bot] commented on PR #2705: URL: https://github.com/apache/flink-cdc/pull/2705#issuecomment-2244013184 This pull request has been automatically marked as stale because it has not had recent activity for 60 days. It will be closed in 30 days if no further activity occurs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-31362][table] Upgrade Calcite version to 1.33.0 [flink]
snuyanzin commented on PR #24255: URL: https://github.com/apache/flink/pull/24255#issuecomment-2243935706 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]
ferenc-csaky commented on code in PR #821: URL: https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1686812531 ## flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/CheckpointSpec.java: ## @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.api.spec; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.kubernetes.operator.api.status.CheckpointType; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** Spec for checkpoint state snapshots. */ +@Experimental +@Data +@NoArgsConstructor +@AllArgsConstructor +@Builder +@JsonIgnoreProperties(ignoreUnknown = true) +public class CheckpointSpec { +/** Type of checkpoint to take. */ +private CheckpointType checkpointType = CheckpointType.FULL; Review Comment: Due to [FLINK-33723](https://issues.apache.org/jira/browse/FLINK-33723), `INCREMENTAL` checkpoint triggering via REST API is disabled in Flink 1.19, so I think we may need to handle this on the operator side as well to not cause errors. Versions < 1.19 will work (although the actual checkpoint semantics might be undesired, see the linked jira for deatails). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]
ferenc-csaky commented on code in PR #821: URL: https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1684358470 ## flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobSpec.java: ## @@ -90,8 +100,8 @@ public class JobSpec implements Diffable { /** * Nonce used to trigger a full redeployment of the job from the savepoint path specified in - * initialSavepointPath. In order to trigger redeployment, change the number to a different - * non-null value. Rollback is not possible after redeployment. + * initialSavepointPath or initialSavepointName. In order to trigger redeployment, change the Review Comment: I might lack context here, but three is no `initialSavepointName` field in the whole codebase other than this comment and the generated doc from it. ## flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/CheckpointSpec.java: ## @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.api.spec; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.kubernetes.operator.api.status.CheckpointType; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** Spec for checkpoint state snapshots. */ +@Experimental +@Data +@NoArgsConstructor +@AllArgsConstructor +@Builder +@JsonIgnoreProperties(ignoreUnknown = true) +public class CheckpointSpec { +/** Type of checkpoint to take. */ +private CheckpointType checkpointType = CheckpointType.FULL; Review Comment: Due to [FLINK-33723](https://issues.apache.org/jira/browse/FLINK-33723), `INCREMENTAL` checkpoint triggering is disabled in Flink 1.19, so I think we may need to handle this on the operator side as well to not cause errors. Versions < 1.19 will work (although the actual checkpoint semantics might be undesired, see the linked jira for deatails). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35267][snapshot] FlinkStateSnapshot documentation and examples [flink-kubernetes-operator]
ferenc-csaky commented on code in PR #854: URL: https://github.com/apache/flink-kubernetes-operator/pull/854#discussion_r1686476104 ## docs/content/docs/custom-resource/snapshots.md: ## @@ -0,0 +1,117 @@ +--- +title: "Snapshots" +weight: 3 +type: docs +aliases: + - /custom-resource/snapshots.html +--- + + +# Snapshots + +To create, list and delete snapshots you can use the custom resource called FlinkStateSnapshot. The operator will use the same controller flow as in the case of FlinkDeployment and FlinkSessionJob to trigger the savepoint/checkpoint and observe its status. + +This feature deprecates the old `savepointInfo` and `checkpointInfo` fields found in the Flink resource CR status, alongside with spec fields `initialSavepointPath`, `savepointTriggerNonce` and `checkpointTriggerNonce`. It is enabled by default by the configuration option `kubernetes.operator.snapshot.resource.enabled`. You can turn it off and the operator will keep using the deprecated status fields to track snapshots. Review Comment: nit: As a rule of thumb, IMO it is a good practice to start every sentence in a new line. MD will render it into 1 paragraph and it keeps text in a more readable format in most cases. If there are a really long sentence, probably it would worth to consider rewording it anyways, so it can be a good indicator for that too. :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-27355][runtime] Unregister JobManagerRunner after it's closed [flink]
kumar-mallikarjuna commented on code in PR #25027: URL: https://github.com/apache/flink/pull/25027#discussion_r1686805757 ## flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java: ## @@ -97,6 +106,14 @@ public JobManagerRunner unregister(JobID jobId) { return this.jobManagerRunners.remove(jobId); } +public void setMainThreadExecutor(ComponentMainThreadExecutor executor) { +mainThreadExecutor = executor; +} + +public ComponentMainThreadExecutor getMainThreadExecutor() { +return mainThreadExecutor; +} + Review Comment: Done, LMK if you still want to collapse `DefaultJobManagerRunnerRegistry` and `OnMainThreadJobManagerRunnerRegistry`. ## flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java: ## @@ -97,6 +106,14 @@ public JobManagerRunner unregister(JobID jobId) { return this.jobManagerRunners.remove(jobId); } +public void setMainThreadExecutor(ComponentMainThreadExecutor executor) { +mainThreadExecutor = executor; +} + +public ComponentMainThreadExecutor getMainThreadExecutor() { +return mainThreadExecutor; +} + Review Comment: Incorporated. LMK if you still want to collapse `DefaultJobManagerRunnerRegistry` and `OnMainThreadJobManagerRunnerRegistry`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-27355][runtime] Unregister JobManagerRunner after it's closed [flink]
kumar-mallikarjuna commented on code in PR #25027: URL: https://github.com/apache/flink/pull/25027#discussion_r1686804638 ## flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java: ## @@ -164,7 +164,8 @@ void testFailingLocalCleanup() { .hasExactlyElementsOfTypes(ExecutionException.class, expectedException.getClass()) .last() .isEqualTo(expectedException); - assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())).isFalse(); +// Since the cleanup failed, the JobManagerRunner is expected to not have been unregistered. Review Comment: Done, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-21373] Add RabbitMQ SinkV2 Implementation, Port Flink version to Flink 1.19 [flink-connector-rabbitmq]
rjbaucells commented on PR #29: URL: https://github.com/apache/flink-connector-rabbitmq/pull/29#issuecomment-2243290547 Hello, What is the status of this PR? Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-31215] [autoscaler] Backpropagate processing rate limits from non-scalable bottlenecks to upstream operators [flink-kubernetes-operator]
mxm commented on PR #847: URL: https://github.com/apache/flink-kubernetes-operator/pull/847#issuecomment-2243227124 > The `actual_target_data_rate_join` still can be greater than `target_data_rate_upstream_1` or `target_data_rate_upstream_2`. It means that the upstreams' target_data_rate remains unchanged. > > Also, the `actual_target_data_rate_join` can be less than the target_data_rate of upstream, making them equal to `actual_target_data_rate_join`. But then the target_data_rate of the join will be two times greater than it was expected. > > In both cases, the upstream_1 and upstream_2 operators will remain blocked after scaling. This is why the simpler approach may not be good enough. I think it can work if we apply the same logic that we used to determine `target_data_rate_join`. As you pointed out, we determined the taget data rate via: `actual_data_rate_join = actual_data_rate_upstream_1 + actual_data_rate_upstream_2` Consequently, we would need to satisfy the following equation for the backpropagation: `target_data_rate_join = target_data_rate_upstream_1 + target_data_rate_upstream_2` That would mean that each input vertex gets the following limit applied: `actual_data_rate_upstream_i = target_data_rate_upstream_i - (target_data_rate_join - actual_target_data_rate_join) / N` where `N` is the number of inputs. Do you think that would work? The benefit of this approach is that we leverage all the available information without having to add and backfeed additional factors. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-27355][runtime] Unregister JobManagerRunner after it's closed [flink]
kumar-mallikarjuna commented on code in PR #25027: URL: https://github.com/apache/flink/pull/25027#discussion_r1686721979 ## flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java: ## @@ -97,6 +106,14 @@ public JobManagerRunner unregister(JobID jobId) { return this.jobManagerRunners.remove(jobId); } +public void setMainThreadExecutor(ComponentMainThreadExecutor executor) { +mainThreadExecutor = executor; +} + +public ComponentMainThreadExecutor getMainThreadExecutor() { +return mainThreadExecutor; +} + Review Comment: Ah I see! I was hesitant about this because that effectively means the `cleanupExecutor` for a `JobManagerRegistry` is always the `mainThreadExecutor`. But since `JobManagerRegistry`'s current implementations don't use the `cleanupExecutor` anyway so it's fine. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35674][cdc-connector][mysql]Fix blocking caused by searching for timestamp in binlog file [flink-cdc]
whhe commented on PR #3432: URL: https://github.com/apache/flink-cdc/pull/3432#issuecomment-2242931869 I think adding a server id can indeed solve the issue, and a LifecycleListener is also needed for other exceptions. Can you add a basic implementation like `ReaderThreadLifecycleListener` please? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-35873][cdc-connector][paimon] Add HashFunctionProvider implementation for PaimonDataSink. [flink-cdc]
lvyanquan opened a new pull request, #3486: URL: https://github.com/apache/flink-cdc/pull/3486 A followup of https://github.com/apache/flink-cdc/pull/3414. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35872][table] Fix the incorrect partition generation during period refresh in Full Mode [flink]
lsyldliu closed pull request #25108: [FLINK-35872][table] Fix the incorrect partition generation during period refresh in Full Mode URL: https://github.com/apache/flink/pull/25108 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34877][cdc] Flink CDC pipeline transform supports type conversion [flink-cdc]
leonardBang commented on PR #3357: URL: https://github.com/apache/flink-cdc/pull/3357#issuecomment-2242770101 > Is it possible to add actual values conversion checks besides existing syntax parsing tests? And it should cover cases where `NULL` value is passed as conversion argument. @aiwenmo I think @yuxiqian's idea is reasonable, could you add more value tests ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-27355][runtime] Unregister JobManagerRunner after it's closed [flink]
XComp commented on code in PR #25027: URL: https://github.com/apache/flink/pull/25027#discussion_r1686411653 ## flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java: ## @@ -97,6 +106,14 @@ public JobManagerRunner unregister(JobID jobId) { return this.jobManagerRunners.remove(jobId); } +public void setMainThreadExecutor(ComponentMainThreadExecutor executor) { +mainThreadExecutor = executor; +} + +public ComponentMainThreadExecutor getMainThreadExecutor() { +return mainThreadExecutor; +} + Review Comment: JobManagerRunnerRegistry would implement `LocallyCleanableInMainThreadResource` instead of `LocallyCleanableResource`. The conversion of the `LocallyCleanableInMainThreadResource#localCleanupAsync` callback to the `LocallyCleanableResource#localCleanupAsync` callback would then happen in `DispatcherResourceCleanerFactory#create[Loc|Glob]alResourceCleaner`. ## flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java: ## @@ -97,6 +106,14 @@ public JobManagerRunner unregister(JobID jobId) { return this.jobManagerRunners.remove(jobId); } +public void setMainThreadExecutor(ComponentMainThreadExecutor executor) { +mainThreadExecutor = executor; +} + +public ComponentMainThreadExecutor getMainThreadExecutor() { +return mainThreadExecutor; +} + Review Comment: `JobManagerRunnerRegistry` would implement `LocallyCleanableInMainThreadResource` instead of `LocallyCleanableResource`. The conversion of the `LocallyCleanableInMainThreadResource#localCleanupAsync` callback to the `LocallyCleanableResource#localCleanupAsync` callback would then happen in `DispatcherResourceCleanerFactory#create[Loc|Glob]alResourceCleaner`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35864][Table SQL / API] Add CONV function [flink]
flinkbot commented on PR #25109: URL: https://github.com/apache/flink/pull/25109#issuecomment-2242532958 ## CI report: * fc3fe3cbe67d160f08dec878039f66d42a91b29d UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-35864][Table SQL / API] Add CONV function [flink]
dylanhz opened a new pull request, #25109: URL: https://github.com/apache/flink/pull/25109 ## What is the purpose of the change Add CONV function. Examples: ```SQL > SELECT CONV(7, 10, 2); 111 ``` ## Brief change log [FLINK-35864](https://issues.apache.org/jira/browse/FLINK-35864) **This implementation is slightly different from the original Hive version. (BaseConversionUtils line 190)** ## Verifying this change `MathFunctionsITCase#convTestCases` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (docs) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32501][table-planner] Fix wrong plan of a proctime window aggregation generated due to incorrect cost evaluation [flink]
Myasuka commented on PR #22964: URL: https://github.com/apache/flink/pull/22964#issuecomment-2242408382 It seems this backport was not merged in release-1.17 branch? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35318][table-planner] use UTC timezone to handle TIMESTAMP_WITHOUT_TIME_ZONE type in RexNodeToExpressionConverter [flink]
lshangq commented on PR #24787: URL: https://github.com/apache/flink/pull/24787#issuecomment-2242267464 @leonardBang Could you please take a look when you have time? cherry-pick to release-1.19 https://github.com/apache/flink/pull/25093 cherry-pick to release-1.20 https://github.com/apache/flink/pull/25094 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35674][cdc-connector][mysql]Fix blocking caused by searching for timestamp in binlog file [flink-cdc]
shiyiky commented on PR #3432: URL: https://github.com/apache/flink-cdc/pull/3432#issuecomment-2242253725 @whhe PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35872][table] Fix the incorrect partition generation during period refresh in Full Mode [flink]
hackergin commented on PR #25108: URL: https://github.com/apache/flink/pull/25108#issuecomment-2242204010 @lsyldliu Thank you for reviewing. I have addressed the related comments; please have a look -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35865][base] Support Byte and Short in ObjectUtils [flink-cdc]
GOODBOY008 commented on PR #3481: URL: https://github.com/apache/flink-cdc/pull/3481#issuecomment-2242135166 @lvyanquan PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35871][doc] add "snapshot" to mysql connector startup options. [flink-cdc]
leonardBang merged PR #3484: URL: https://github.com/apache/flink-cdc/pull/3484 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [minor][cdc-connector][mysql] catch all exceptions to display the table info [flink-cdc]
lvyanquan opened a new pull request, #3485: URL: https://github.com/apache/flink-cdc/pull/3485 parseDDL doesn't always throw ParsingException, and table info will be missed in this case. ![image](https://github.com/user-attachments/assets/5d053c50-3b80-4a9b-9994-a86a8c9a6be9) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-27355][runtime] Unregister JobManagerRunner after it's closed [flink]
kumar-mallikarjuna commented on code in PR #25027: URL: https://github.com/apache/flink/pull/25027#discussion_r1685926520 ## flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java: ## @@ -97,6 +106,14 @@ public JobManagerRunner unregister(JobID jobId) { return this.jobManagerRunners.remove(jobId); } +public void setMainThreadExecutor(ComponentMainThreadExecutor executor) { +mainThreadExecutor = executor; +} + +public ComponentMainThreadExecutor getMainThreadExecutor() { +return mainThreadExecutor; +} + Review Comment: > But what we can do is making the main thread passing an implementation detail of the `DispatcherResourceCleanerFactory`: We could introduce a twin interface `LocallyCleanableWithMainThreadResource`: > > ```java > @FunctionalInterface > public interface LocallyCleanableInMainThreadResource { > > CompletableFuture localCleanupAsync( > JobID jobId, Executor cleanupExecutor, Executor mainThreadExecutor); > } > ``` > > That interface can be converted to a `LocallyCleanableResource` in `DispatcherResourceCleanerFactory`. We wouldn't have to change the `Dispatcher` instantiation in that case. WDYT? I didn't get this. Which classes would implement `LocallyCleanableInMainThreadResource`? Can you elaborate more? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35872][table] Fix the incorrect partition generation during period refresh in Full Mode [flink]
lsyldliu commented on code in PR #25108: URL: https://github.com/apache/flink/pull/25108#discussion_r1685926056 ## flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManagerTest.java: ## @@ -99,56 +103,340 @@ void testGenerateInsertStatementWithDynamicOptions() { assertThat(actualStatement).isEqualTo(expectedStatement); } -@Test -void testGetPeriodRefreshPartition() { -String schedulerTime = "2024-01-01 00:00:00"; -Map tableOptions = new HashMap<>(); -tableOptions.put("partition.fields.day.date-formatter", "-MM-dd"); -tableOptions.put("partition.fields.hour.date-formatter", "HH"); - +@ParameterizedTest(name = "{index}: {0}") +@MethodSource("testData") +void testGetPeriodRefreshPartition(TestSpec testSpec) { ObjectIdentifier objectIdentifier = ObjectIdentifier.of("catalog", "database", "table"); -Map actualRefreshPartition = -MaterializedTableManager.getPeriodRefreshPartition( -schedulerTime, objectIdentifier, tableOptions, ZoneId.systemDefault()); -Map expectedRefreshPartition = new HashMap<>(); -expectedRefreshPartition.put("day", "2024-01-01"); -expectedRefreshPartition.put("hour", "00"); +if (testSpec.errorMessage == null) { +Map actualRefreshPartition = +MaterializedTableManager.getPeriodRefreshPartition( +testSpec.schedulerTime, +testSpec.freshness, +objectIdentifier, +testSpec.tableOptions, +ZoneId.systemDefault()); -assertThat(actualRefreshPartition).isEqualTo(expectedRefreshPartition); + assertThat(actualRefreshPartition).isEqualTo(testSpec.expectedRefreshPartition); +} else { +assertThatThrownBy( +() -> + MaterializedTableManager.getPeriodRefreshPartition( +testSpec.schedulerTime, +testSpec.freshness, +objectIdentifier, +testSpec.tableOptions, +ZoneId.systemDefault())) +.hasMessage(testSpec.errorMessage); +} } -@Test -void testGetPeriodRefreshPartitionWithInvalidSchedulerTime() { -// scheduler time is null -Map tableOptions = new HashMap<>(); -tableOptions.put("partition.fields.day.date-formatter", "-MM-dd"); -tableOptions.put("partition.fields.hour.date-formatter", "HH"); +static Stream testData() { +return Stream.of( +// The interval of freshness match the partition specified by the 'date-formatter'. +TestSpec.create() +.schedulerTime("2024-01-01 00:00:00") +.freshness(IntervalFreshness.ofDay("1")) +.tableOptions("partition.fields.day.date-formatter", "-MM-dd") +.expectedRefreshPartition("day", "2023-12-31"), +TestSpec.create() +.schedulerTime("2024-01-02 00:00:00") +.freshness(IntervalFreshness.ofDay("1")) +.tableOptions("partition.fields.day.date-formatter", "-MM-dd") +.expectedRefreshPartition("day", "2024-01-01"), +TestSpec.create() +.schedulerTime("2024-01-02 00:00:00") +.freshness(IntervalFreshness.ofHour("1")) +.tableOptions("partition.fields.day.date-formatter", "-MM-dd") +.tableOptions("partition.fields.hour.date-formatter", "HH") +.expectedRefreshPartition("day", "2024-01-01") +.expectedRefreshPartition("hour", "23"), +TestSpec.create() +.schedulerTime("2024-01-02 01:00:00") +.freshness(IntervalFreshness.ofHour("1")) +.tableOptions("partition.fields.day.date-formatter", "-MM-dd") +.tableOptions("partition.fields.hour.date-formatter", "HH") +.expectedRefreshPartition("day", "2024-01-02") +.expectedRefreshPartition("hour", "00"), +TestSpec.create() +.schedulerTime("2024-01-01 00:00:00") +.freshness(IntervalFreshness.ofHour("2")) +.tableOptions("partition.fields.day.date-formatter", "-MM-dd") +.tableOptions("partition.fields.hour.date-formatter", "HH") +
Re: [PR] [FLINK-35872][table] Fix the incorrect partition generation during period refresh in Full Mode [flink]
lsyldliu commented on code in PR #25108: URL: https://github.com/apache/flink/pull/25108#discussion_r1685891811 ## flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java: ## @@ -1520,6 +1520,58 @@ void testRefreshMaterializedTableWithInvalidParameterInContinuousMode() throws E .asSerializableString())); } +@Test +void testMaterializedTableDefinitionQueryContainsTemporaryResources() throws Exception { Review Comment: This is an unrelated test with your fix? ## flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java: ## @@ -1417,7 +1417,7 @@ void testPeriodicRefreshMaterializedTableWithPartitionOptions() throws Exception sessionHandle, materializedTableIdentifier.asSerializableString(), true, -"2024-01-02 00:00:00", +"2024-01-03 00:00:00", Review Comment: I found this test should be optimized as follow: ``` List data = new ArrayList<>(); // create materialized table with partition formatter createAndVerifyCreateMaterializedTableWithData( "my_materialized_table", data, Collections.singletonMap("ds", "-MM-dd"), RefreshMode.FULL); ObjectIdentifier materializedTableIdentifier = ObjectIdentifier.of( fileSystemCatalogName, TEST_DEFAULT_DATABASE, "my_materialized_table"); // add data to all data list data.add(Row.of(1L, 1L, 1L, "2024-01-01")); data.add(Row.of(2L, 2L, 2L, "2024-01-02")); data.add(Row.of(4L, 4L, 4L, "2024-01-02")); // refresh the materialized table with period schedule long startTime = System.currentTimeMillis(); OperationHandle periodRefreshTableHandle = service.refreshMaterializedTable( sessionHandle, materializedTableIdentifier.asSerializableString(), true, "2024-01-03 00:00:00", Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap()); ``` Only then periodic refresh is matched with full mode, not continuous mode. ## flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManagerTest.java: ## @@ -99,56 +103,340 @@ void testGenerateInsertStatementWithDynamicOptions() { assertThat(actualStatement).isEqualTo(expectedStatement); } -@Test -void testGetPeriodRefreshPartition() { -String schedulerTime = "2024-01-01 00:00:00"; -Map tableOptions = new HashMap<>(); -tableOptions.put("partition.fields.day.date-formatter", "-MM-dd"); -tableOptions.put("partition.fields.hour.date-formatter", "HH"); - +@ParameterizedTest(name = "{index}: {0}") +@MethodSource("testData") +void testGetPeriodRefreshPartition(TestSpec testSpec) { ObjectIdentifier objectIdentifier = ObjectIdentifier.of("catalog", "database", "table"); -Map actualRefreshPartition = -MaterializedTableManager.getPeriodRefreshPartition( -schedulerTime, objectIdentifier, tableOptions, ZoneId.systemDefault()); -Map expectedRefreshPartition = new HashMap<>(); -expectedRefreshPartition.put("day", "2024-01-01"); -expectedRefreshPartition.put("hour", "00"); +if (testSpec.errorMessage == null) { +Map actualRefreshPartition = +MaterializedTableManager.getPeriodRefreshPartition( +testSpec.schedulerTime, +testSpec.freshness, +objectIdentifier, +testSpec.tableOptions, +ZoneId.systemDefault()); -assertThat(actualRefreshPartition).isEqualTo(expectedRefreshPartition); + assertThat(actualRefreshPartition).isEqualTo(testSpec.expectedRefreshPartition); +} else { +assertThatThrownBy( +() -> + MaterializedTableManager.getPeriodRefreshPartition( +testSpec.schedulerTime, +testSpec.freshness, +objectIdentifier, +testSpec.tableOptions, +ZoneId.systemDefault())) +
Re: [PR] [FLINK-35237] Allow Sink to Choose HashFunction in PrePartitionOperator [flink-cdc]
leonardBang merged PR #3414: URL: https://github.com/apache/flink-cdc/pull/3414 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34944] Use Incremental Source Framework in Flink CDC OceanBase Source Connector [flink-cdc]
github-actions[bot] commented on PR #3211: URL: https://github.com/apache/flink-cdc/pull/3211#issuecomment-2241824995 This pull request has been automatically marked as stale because it has not had recent activity for 60 days. It will be closed in 30 days if no further activity occurs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-23589][flink-avro] Support microsecond precision for timestamp [flink]
asantoz commented on PR #19537: URL: https://github.com/apache/flink/pull/19537#issuecomment-2241809600 Hey, why this blocked? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35872][table] Fix the incorrect partition generation during period refresh in Full Mode [flink]
flinkbot commented on PR #25108: URL: https://github.com/apache/flink/pull/25108#issuecomment-2241624001 ## CI report: * 6bd73ab3f17dccbecb1113cd0f5aaedec0579f78 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-35872][table] Fix the incorrect partition generation during period refresh in Full Mode [flink]
hackergin opened a new pull request, #25108: URL: https://github.com/apache/flink/pull/25108 ## What is the purpose of the change *Fix the incorrect partition generation during period refresh in Full Mode* ## Brief change log - Fix the incorrect partition generation during period refresh in Full Mode ## Verifying this change - Add some test case in `MaterializedTableManagerTest#testGetPeriodRefreshPartition` to verify the change. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (no) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org