Re: [PR] [DRAFT][FLINK-34440][formats][protobuf-confluent] add support for protobuf-confluent [flink]

2024-07-23 Thread via GitHub


dmariassy commented on PR #24482:
URL: https://github.com/apache/flink/pull/24482#issuecomment-2246151636

   Draft PR for the alternative implementation: 
https://github.com/apache/flink/pull/25114


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [DRAFT][FLINK-34440][formats][protobuf-confluent] Protobuf confluent dynamic format [flink]

2024-07-23 Thread via GitHub


dmariassy opened a new pull request, #25114:
URL: https://github.com/apache/flink/pull/25114

   **This is a DRAFT PR**.
   
   ## TODO
   
   - Add missing boilerplate (e.g. config)
   - Add debezium support
   - Test the format in Shopify Flink jobs
   - Docs (might be a separate PR)
   
   ## What is the purpose of the change
   
   - Add support for deserializing protobuf messages using the Confluent wire 
format and whose schemas can be fetched from Confluent Schema Registry
   - Add support for serializing Flink records using the Confluent protobuf 
wire format
   
   ## Brief change log
   
   My intention was to:
   - Maintain parity with the existing flink-protobuf format's semantics in 
terms of the Flink -> Protobuf / Protobuf -> Flink conversions
   - Maximize code reuse between flink-protobuf-confluent and flink-protobuf 
formats
   
   ### Deserializer
   
   - Fetch the message's protobuf descriptor from the Confluent schema registry
   - Generate a java class from the descriptor at runtime
   - Deserialize `byte[]`s to the generated `protobuf.Message` type using a 
`io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer`
   - Delegate the work of converting between a `protobuf.Message` and a 
`RowData` object to the existing flink-protobuf format
   
   ### Serializer
   
   - Convert the user's `RowType` to a protobuf descriptor
   - Generate a java class from the descriptor at runtime
   - Delegate the `RowData` -> `AbstractMessage` conversion to the existing 
flink-protobuf format
   - Serialize the `AbstractMessage` object using a 
`io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer`
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follow [the 
conventions for tests defined in our code quality 
guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing).
   
   This change added tests and can be verified as follows:
   
   - Added comprehensive test coverage
   - Will shortly deploy to Shopify Flink clusters 
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): yes 
(com.github.os72:protoc-jar)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: yes
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? docs to follow
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]

2024-07-23 Thread via GitHub


mateczagany commented on code in PR #821:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1688472333


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java:
##
@@ -716,6 +695,48 @@ public CheckpointFetchResult fetchCheckpointInfo(
 }
 }
 
+@Override
+public CheckpointStatsResult fetchCheckpointStats(
+String jobId, Long checkpointId, Configuration conf) {
+try (RestClusterClient clusterClient = getClusterClient(conf)) 
{
+var checkpointStatusHeaders = 
CheckpointStatisticDetailsHeaders.getInstance();
+var parameters = 
checkpointStatusHeaders.getUnresolvedMessageParameters();
+parameters.jobPathParameter.resolve(JobID.fromHexString(jobId));
+
+// This was needed because the parameter is protected
+var checkpointIdPathParameter =
+(CheckpointIdPathParameter) 
Iterables.getLast(parameters.getPathParameters());
+checkpointIdPathParameter.resolve(checkpointId);
+
+var response =
+clusterClient.sendRequest(
+checkpointStatusHeaders, parameters, 
EmptyRequestBody.getInstance());
+
+var stats = response.get();
+if (stats == null) {
+throw new IllegalStateException("Checkpoint ID %d for job %s 
does not exist!");
+} else if (stats instanceof 
CheckpointStatistics.CompletedCheckpointStatistics) {
+return CheckpointStatsResult.completed(
+((CheckpointStatistics.CompletedCheckpointStatistics) 
stats)
+.getExternalPath());
+} else if (stats instanceof 
CheckpointStatistics.FailedCheckpointStatistics) {
+return CheckpointStatsResult.error(
+((CheckpointStatistics.FailedCheckpointStatistics) 
stats)
+.getFailureMessage());
+} else if (stats instanceof 
CheckpointStatistics.PendingCheckpointStatistics) {
+return CheckpointStatsResult.pending();
+} else {
+throw new IllegalArgumentException(
+String.format(
+"Unknown checkpoint statistics result class: 
%s",
+stats.getClass().getSimpleName()));
+}
+} catch (Exception e) {
+LOG.error("Exception while fetching checkpoint statistics", e);

Review Comment:
   We should handle cases where the checkpoint statistics are no longer stored 
on the web server, and we get the following error: 
   ```
   Could not find checkpointing statistics for checkpoint 243.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]

2024-07-23 Thread via GitHub


mateczagany commented on code in PR #821:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1688463075


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/AuditUtils.java:
##
@@ -53,6 +62,16 @@ private static String format(@NonNull CommonStatus 
status) {
 : status.getError());
 }
 
+private static String format(@NonNull FlinkStateSnapshotStatus status) {
+if (StringUtils.isEmpty(status.getError())) {
+return String.format(
+">>> Status[Snapshot] | Info | %s | %s", 
status.getState(), status.getPath());

Review Comment:
   I have added some changes for this, thank you!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-07-23 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1688281873


##
flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsSinkBuilderTest.java:
##
@@ -25,14 +25,6 @@
 /** Covers construction, defaults and sanity checking of {@link 
SqsSinkBuilder}. */
 class SqsSinkBuilderTest {
 
-@Test
-void elementConverterOfSinkMustBeSetWhenBuilt() {
-Assertions.assertThatExceptionOfType(NullPointerException.class)
-.isThrownBy(() -> 
SqsSink.builder().setSqsUrl("sqlUrl").build())
-.withMessageContaining(
-"No SerializationSchema was supplied to the SQS Sink 
builder.");
-}
-

Review Comment:
   because we added a default SerializationSchema if it is not provided by the 
customer so we never end up throwing this error



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35272][cdc][runtime] Pipeline Transform job supports omitting / renaming calculation column [flink-cdc]

2024-07-23 Thread via GitHub


yuxiqian commented on code in PR #3285:
URL: https://github.com/apache/flink-cdc/pull/3285#discussion_r1688271544


##
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java:
##


Review Comment:
   I modified expression evaluation logic here since previous version could not 
correctly handle expressions like `filter: a > 1 and a < 2` and generates 
duplicated argument names which would crash the operator.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35272][cdc][runtime] Pipeline Transform job supports omitting / renaming calculation column [flink-cdc]

2024-07-23 Thread via GitHub


yuxiqian commented on code in PR #3285:
URL: https://github.com/apache/flink-cdc/pull/3285#discussion_r1688271544


##
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java:
##


Review Comment:
   I modified expression evaluation logic here since previous version could not 
correctly handle expressions like `filter: a > 1 and a < 2` and generates 
duplicated argument names which would crash Janino compiler.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]

2024-07-23 Thread via GitHub


mateczagany commented on code in PR #821:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1688262282


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotContext.java:
##
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.controller;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
+import org.apache.flink.kubernetes.operator.api.spec.JobKind;
+import 
org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+import java.util.Optional;
+
+/** Context for reconciling a snapshot. */
+@Getter
+@RequiredArgsConstructor
+public class FlinkStateSnapshotContext {
+
+private final FlinkStateSnapshot resource;
+private final FlinkStateSnapshotStatus originalStatus;
+private final Context josdkContext;
+private final FlinkConfigManager configManager;
+
+private FlinkOperatorConfiguration operatorConfig;
+private Configuration referencedJobObserveConfig;
+private FlinkDeployment referencedJobFlinkDeployment;
+
+/**
+ * @return Operator configuration for this resource.
+ */
+public FlinkOperatorConfiguration getOperatorConfig() {
+if (operatorConfig != null) {
+return operatorConfig;
+}
+return operatorConfig =
+configManager.getOperatorConfiguration(
+getResource().getMetadata().getNamespace(), null);

Review Comment:
   Added lazy getter as well, thank you!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-22748][connector-kafka] Allow dynamic target topic selection in SQL Kafka sinks [flink-connector-kafka]

2024-07-23 Thread via GitHub


klam-shop commented on code in PR #109:
URL: 
https://github.com/apache/flink-connector-kafka/pull/109#discussion_r1688236979


##
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java:
##
@@ -144,14 +147,34 @@ public void open(
 valueSerialization.open(context);
 }
 
+private String getTargetTopic(RowData element) {
+final String topic = readMetadata(element, 
KafkaDynamicSink.WritableMetadata.TOPIC);
+if (topic == null && topics == null) {
+throw new IllegalArgumentException(
+"The topic of the sink record is not valid. Expected a 
single topic but no topic is set.");
+} else if (topic == null && topics.size() == 1) {
+return topics.get(0);
+} else if (topics != null && topics.size() > 0 && 
!topics.contains(topic)) {

Review Comment:
   Open to discuss: I decided to keep topics as a List since, but we can change 
it to HashSet. I am assuming the list will be short on average.
   
   List can outperform HashSet for small number of elements:
   https://stackoverflow.com/questions/150750/hashset-vs-list-performance



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-22748][connector-kafka] Allow dynamic target topic selection in SQL Kafka sinks [flink]

2024-07-23 Thread via GitHub


klam-shop commented on PR #16142:
URL: https://github.com/apache/flink/pull/16142#issuecomment-2245500551

   Hi I've taken some time to build upon [Nicholas 
Jiang](https://issues.apache.org/jira/secure/ViewProfile.jspa?name=nicholasjiang)
 's PR and port it to the new kafka connector repo:
   
   https://github.com/apache/flink-connector-kafka/pull/109
   
   Would appreciate feedback on this approach, let me know.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-22748][connector-kafka] Allow dynamic target topic selection in SQL Kafka sinks [flink-connector-kafka]

2024-07-23 Thread via GitHub


klam-shop opened a new pull request, #109:
URL: https://github.com/apache/flink-connector-kafka/pull/109

   ## What is the purpose of the change
   Allows writing to different Kafka topics based on the `topic` metadata 
column value in SQL, and updates the Table API's `KafkaDynamicSink` to accept a 
`List topics` instead of `String topic`. The list acts as a whitelist 
of acceptable values for the `topic` metadata column: 
   
   - If a single topic is provided, it is used by default for the target topic 
to produce to
   - If a list is provided, only that list of topics can be produced to
   - If no list is provided, any value is accepted
   
   Builds on the work in https://github.com/apache/flink/pull/16142 by 
@SteNicholas 
   
   ## Brief change log
   - Adds `topic` as writable metadata
   - Updates Table API Kafka sink to accept `List topics` instead of 
`String topic`, mirroring he source side.
   - Implements whitelist behaviour in `DynamicKafkaRecordSerializationSchema`
   
   ## Verifying this change
   - [x] Tests that the Sink Factory and related machinery works as expected
   - [x] Tests the various valid and invalid scenarios for `topic` metadata in 
`DynamicKafkaRecordSerializationSchema`
   
   ## Does this pull request potentially affect one of the following parts:
   
   - Dependencies (does it add or upgrade a dependency): no
   - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
   - The serializers: yes
   - The runtime per-record code paths (performance sensitive): yes
   - Anything that affects deployment or recovery: no
   - The S3 file system connector: no
   
   ## Documentation
   
   - Does this pull request introduce a new feature? yes
   - If yes, how is the feature documented? updated documentation 
   
   
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]

2024-07-23 Thread via GitHub


mateczagany commented on code in PR #821:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1688221106


##
flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkStateSnapshotState.java:
##
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.api.status;
+
+import org.apache.flink.annotation.Experimental;
+
+/** Describes current snapshot state. */
+@Experimental
+public enum FlinkStateSnapshotState {

Review Comment:
   Good idea, thank you!
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35272][cdc][runtime] Pipeline Transform job supports omitting / renaming calculation column [flink-cdc]

2024-07-23 Thread via GitHub


yuxiqian commented on PR #3285:
URL: https://github.com/apache/flink-cdc/pull/3285#issuecomment-2245470485

   Done, rebased due to some conflicts with 
https://github.com/apache/flink-cdc/commit/26ff6d2a081181f3df7aa49d65d804c57c634122.
 Will add `CAST ... AS` tests after #3357 got merged.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35857] fix redeploy failed deployment without latest checkpoint [flink-kubernetes-operator]

2024-07-23 Thread via GitHub


gyfora commented on code in PR #855:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/855#discussion_r1688196270


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java:
##
@@ -289,6 +289,8 @@ public boolean 
reconcileOtherChanges(FlinkResourceContext ctx) throws Except
 LOG.info("Stopping failed Flink job...");
 cleanupAfterFailedJob(ctx);
 status.setError(null);
+ReconciliationUtils.updateStatusForDeployedSpec(
+ctx.getResource(), 
ctx.getDeployConfig(ctx.getResource().getSpec()), clock);

Review Comment:
   Hm I see, I think you are right. I think the more straightforward and 
simpler fix would be to actually fix the `resubmitJob` method. In case where we 
have a savepoint it should set the upgradeMode to savepoint on the spec to 
recover. Otherwise it can leave it stateless. What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35857] fix redeploy failed deployment without latest checkpoint [flink-kubernetes-operator]

2024-07-23 Thread via GitHub


chenyuzhi459 commented on code in PR #855:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/855#discussion_r1688138227


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java:
##
@@ -289,6 +289,8 @@ public boolean 
reconcileOtherChanges(FlinkResourceContext ctx) throws Except
 LOG.info("Stopping failed Flink job...");
 cleanupAfterFailedJob(ctx);
 status.setError(null);
+ReconciliationUtils.updateStatusForDeployedSpec(
+ctx.getResource(), 
ctx.getDeployConfig(ctx.getResource().getSpec()), clock);

Review Comment:
   Assume a flink deployment is submitted to the flink-kubernetes-operator for 
the first time with the following settings
   
   ```
   spec.job.upgradeMode=savepoint
   spec.job.initialSavepointPath=null
   spec.flinkConfiguration.execution.checkpointing.interval=60s
   ```
   
   Then I will share the startup and failover process of 
flink-kubernetes-operator based on my understanding:
   
   1. At the first reconcile,  in method 
[AbstractFlinkResourceReconciler.updateStatusBeforeFirstDeployment](https://github.com/apache/flink-kubernetes-operator/blob/29076c80eaac5547e3d12f703c43780cd4a52dad/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L224)
 , the `spec.job.upgradeMode` of cloned deployment will be set to STATELESS 
(this will not be updated synchronously to origin deployment in k8s, which 
means that the origin deployment's spec.job.upgradeMode is still savepoint)  
because `spec.job.initialSavepointPath` is empty,  and will be serialized into 
`status.reconciliationStatus.lastReconciledSpec` (this step will be 
synchronously updated to the origin deployment in k8s, I haven't studied why 
yet will happen)
   
   
   2. After running for a period of time, the deployment  may encounters a 
problem and exit with failed status. The operator will save the latest 
checkpoint in the `status.jobStatus.savepointInfo.lastSavepoint` of the origin 
deployment in  the method `SnapshotObserver.observeSavepointStatus`.
   
   3. Then in the method 
[AbstractJobReconciler.resubmitJob](https://github.com/apache/flink-kubernetes-operator/blob/29076c80eaac5547e3d12f703c43780cd4a52dad/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java#L315),
 the lastReconciledSpec of the origin deployment will be read as specToRecover 
variable and passed to the method   
[AbstractJobReconciler.restore](https://github.com/apache/flink-kubernetes-operator/blob/29076c80eaac5547e3d12f703c43780cd4a52dad/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java#L260).
 In the method `AbstractJobReconciler.restore`, it will determine whether to 
recover from lastSavepoint based on whether the `spec.job.upgradeMode` of  
specToRecover variable is STATELESS . Before fixed, the updateMode here is 
obviously STATELESS.
   
   
   
   Therefore, in the faiover scenes, I think just serializing the origin 
deployment's `spec.job.upgradeMode=SAVEPOINT` to ` 
status.reconciliationStatus.lastReconciledSpec` before resubmitJob can solve 
this problem.
   
   I don’t know if there is something wrong with my understanding. If so, I 
hope you can correct me. Thank you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]

2024-07-23 Thread via GitHub


mateczagany commented on code in PR #821:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1688136504


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java:
##
@@ -273,15 +280,19 @@ private void disposeSavepointQuietly(
 }
 }
 
-private void observeLatestSavepoint(
+private void observeLatestCheckpoint(

Review Comment:
   Yep, this will be the latest checkpoint retrieved via Flink REST API, I have 
updated the log



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]

2024-07-23 Thread via GitHub


mateczagany commented on code in PR #821:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1688126716


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotController.java:
##
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.controller;
+
+import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
+import 
org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus;
+import 
org.apache.flink.kubernetes.operator.observer.snapshot.StateSnapshotObserver;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import 
org.apache.flink.kubernetes.operator.reconciler.snapshot.StateSnapshotReconciler;
+import 
org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.EventSourceUtils;
+import org.apache.flink.kubernetes.operator.utils.FlinkStateSnapshotUtils;
+import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
+import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
+
+import io.javaoperatorsdk.operator.api.reconciler.Cleaner;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler;
+import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl;
+import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
+import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer;
+import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+import io.javaoperatorsdk.operator.processing.event.source.EventSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/** Controller that runs the main reconcile loop for {@link 
FlinkStateSnapshot}. */
+@ControllerConfiguration
+public class FlinkStateSnapshotController
+implements Reconciler,
+ErrorStatusHandler,
+EventSourceInitializer,
+Cleaner {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(FlinkStateSnapshotController.class);
+
+private final Set validators;
+private final FlinkResourceContextFactory ctxFactory;
+private final StateSnapshotReconciler reconciler;
+private final StateSnapshotObserver observer;
+private final EventRecorder eventRecorder;
+private final StatusRecorder 
statusRecorder;
+
+public FlinkStateSnapshotController(
+Set validators,
+FlinkResourceContextFactory ctxFactory,
+StateSnapshotReconciler reconciler,
+StateSnapshotObserver observer,
+EventRecorder eventRecorder,
+StatusRecorder 
statusRecorder) {
+this.validators = validators;
+this.ctxFactory = ctxFactory;
+this.reconciler = reconciler;
+this.observer = observer;
+this.eventRecorder = eventRecorder;
+this.statusRecorder = statusRecorder;
+}
+
+@Override
+public UpdateControl reconcile(
+FlinkStateSnapshot flinkStateSnapshot, Context 
josdkContext) {
+// status might be null here
+flinkStateSnapshot.setStatus(
+Objects.requireNonNullElseGet(
+flinkStateSnapshot.getStatus(), 
FlinkStateSnapshotStatus::new));
+var ctx = ctxFactory.getFlinkStateSnapshotContext(flinkStateSnapshot, 
josdkContext);
+
+observer.observe(ctx);
+
+if (validateSnapshot(ctx)) {
+reconciler.reconcile(ctx);
+}
+
+notifyListeners(ctx);
+return getUpdateControl(ctx);
+}
+
+@Override
+public DeleteControl cleanup(
+FlinkStateSnapshot flinkStateSnapshot, Context 
josdkContext) {
+var ctx 

Re: [PR] [FLINK-35292] Set dummy savepoint path during last-state upgrade [flink-kubernetes-operator]

2024-07-23 Thread via GitHub


gyfora commented on code in PR #849:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/849#discussion_r1688103240


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java:
##
@@ -265,7 +274,7 @@ protected void restoreJob(
 throws Exception {
 Optional savepointOpt = Optional.empty();
 
-if (spec.getJob().getUpgradeMode() != UpgradeMode.STATELESS) {
+if (spec.getJob().getUpgradeMode() == UpgradeMode.SAVEPOINT) {

Review Comment:
   The problem is that this cannot be easily done as what savepoint is passed 
there will depend on where / why the deploy is called. It is for example 
different during initial deployments vs upgrades.
   
   While the optional parameter is not great I would like to avoid adding more 
complexity to the deploy method. It may make sense to actually break up the 
deploy method in 3 parts, stateless/last-state/savepoint which would get rid of 
the optional params + the requireHaMetadata flag at the same time at the 
expense of more methods.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]

2024-07-23 Thread via GitHub


mateczagany commented on code in PR #821:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1688104509


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotContext.java:
##
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.controller;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
+import org.apache.flink.kubernetes.operator.api.spec.JobKind;
+import 
org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+import java.util.Optional;
+
+/** Context for reconciling a snapshot. */
+@Getter
+@RequiredArgsConstructor
+public class FlinkStateSnapshotContext {
+
+private final FlinkStateSnapshot resource;
+private final FlinkStateSnapshotStatus originalStatus;
+private final Context josdkContext;
+private final FlinkConfigManager configManager;
+
+private FlinkOperatorConfiguration operatorConfig;
+private Configuration referencedJobObserveConfig;
+private FlinkDeployment referencedJobFlinkDeployment;
+
+/**
+ * @return Operator configuration for this resource.
+ */
+public FlinkOperatorConfiguration getOperatorConfig() {
+if (operatorConfig != null) {
+return operatorConfig;
+}
+return operatorConfig =
+configManager.getOperatorConfiguration(
+getResource().getMetadata().getNamespace(), null);

Review Comment:
   I have flipped the ifs for now, I will come back to check using Lombok lazy 
getter



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]

2024-07-23 Thread via GitHub


mateczagany commented on code in PR #821:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1688100790


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java:
##
@@ -426,6 +438,14 @@ public static String operatorConfigKey(String key) {
 .withDescription(
 "Max allowed checkpoint age for initiating 
last-state upgrades on running jobs. If a checkpoint is not available within 
the desired age (and nothing in progress) a savepoint will be triggered.");
 
+@Documentation.Section(SECTION_DYNAMIC)
+public static final ConfigOption 
OPERATOR_JOB_SAVEPOINT_DISPOSE_ON_DELETE =
+operatorConfig("savepoint.dispose-on-delete")
+.booleanType()
+.defaultValue(false)
+.withDescription(
+"Savepoint data for FlinkStateSnapshot resources 
created by the operator during upgrades and periodic savepoints will be 
disposed of automatically when the generated Kubernetes resource is deleted.");

Review Comment:
   I believe it's the correct way to say it (to dispose of something)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32682] Make it possible to use query time based time functions in streaming mode [flink]

2024-07-23 Thread via GitHub


dawidwys closed pull request #23083: [FLINK-32682] Make it possible to use 
query time based time functions in streaming mode
URL: https://github.com/apache/flink/pull/23083


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]

2024-07-23 Thread via GitHub


mateczagany commented on code in PR #821:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1688087051


##
flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobSpec.java:
##
@@ -90,8 +100,8 @@ public class JobSpec implements Diffable {
 
 /**
  * Nonce used to trigger a full redeployment of the job from the savepoint 
path specified in
- * initialSavepointPath. In order to trigger redeployment, change the 
number to a different
- * non-null value. Rollback is not possible after redeployment.
+ * initialSavepointPath or initialSavepointName. In order to trigger 
redeployment, change the

Review Comment:
   Forgot to rename it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35776] Simplify job status handling [flink-kubernetes-operator]

2024-07-23 Thread via GitHub


gyfora merged PR #851:
URL: https://github.com/apache/flink-kubernetes-operator/pull/851


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-07-23 Thread via GitHub


hlteoh37 commented on PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#issuecomment-2245153099

   @19priyadhingra The tests seems to be failing. Can we please take a look?
   
   Also - it would be good if we squash the commits!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35857] fix redeploy failed deployment without latest checkpoint [flink-kubernetes-operator]

2024-07-23 Thread via GitHub


gyfora commented on code in PR #855:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/855#discussion_r1687973449


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java:
##
@@ -289,6 +289,8 @@ public boolean 
reconcileOtherChanges(FlinkResourceContext ctx) throws Except
 LOG.info("Stopping failed Flink job...");
 cleanupAfterFailedJob(ctx);
 status.setError(null);
+ReconciliationUtils.updateStatusForDeployedSpec(
+ctx.getResource(), 
ctx.getDeployConfig(ctx.getResource().getSpec()), clock);

Review Comment:
   I don't really understand how this would solve the issue...
   Could you please explain why you think it solves it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35242] Supports per-SE type configuration & "lenient" evolution behavior [flink-cdc]

2024-07-23 Thread via GitHub


leonardBang commented on code in PR #3339:
URL: https://github.com/apache/flink-cdc/pull/3339#discussion_r1687943729


##
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/DataSink.java:
##
@@ -30,5 +33,5 @@ public interface DataSink {
 EventSinkProvider getEventSinkProvider();
 
 /** Get the {@link MetadataApplier} for applying metadata changes to 
external systems. */
-MetadataApplier getMetadataApplier();
+MetadataApplier getMetadataApplier(Set 
enabledEventTypes);

Review Comment:
   This is a public API, we should keep the backward compatibility 



##
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/MetadataApplier.java:
##
@@ -19,13 +19,22 @@
 
 import org.apache.flink.cdc.common.annotation.PublicEvolving;
 import org.apache.flink.cdc.common.event.SchemaChangeEvent;
+import org.apache.flink.cdc.common.event.SchemaChangeEventType;
+import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
 
 import java.io.Serializable;
+import java.util.Set;
 
 /** {@code MetadataApplier} is used to apply metadata changes to external 
systems. */
 @PublicEvolving
 public interface MetadataApplier extends Serializable {
 
+/** Checks if this metadata applier should handle this event type. */
+boolean acceptsSchemaEvolutionType(SchemaChangeEventType 
schemaChangeEventType);
+
+/** Checks what kind of schema change events downstream can handle. */
+Set getSupportedSchemaEvolutionTypes();
+

Review Comment:
   Please consider the compatibility when change existing public API. Imaging 
the case that user has a custom pipeline connector, is it still work after 
he/she bump the flink cdc version?



##
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/exceptions/SchemaEvolveException.java:
##
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.exceptions;
+
+import org.apache.flink.cdc.common.event.SchemaChangeEvent;
+
+import javax.annotation.Nullable;
+
+/** An exception occurred during schema evolution. */
+public class SchemaEvolveException extends Exception {

Review Comment:
   extends `FlinkRuntimeException` or `CDCRuntimeException` ?



##
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisDataSink.java:
##
@@ -74,7 +76,7 @@ public EventSinkProvider getEventSinkProvider() {
 }
 
 @Override
-public MetadataApplier getMetadataApplier() {
-return new DorisMetadataApplier(dorisOptions, configuration);
+public MetadataApplier getMetadataApplier(Set 
enabledEventTypes) {
+return new DorisMetadataApplier(dorisOptions, configuration, 
enabledEventTypes);
 }

Review Comment:
   You need change all connectors in one PR if you bring an incompatible change 
in public API, we should avoid this kind of change.



##
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/SchemaChangeBehavior.java:
##
@@ -22,7 +22,9 @@
 /** Behavior for handling schema changes. */
 @PublicEvolving
 public enum SchemaChangeBehavior {
-EVOLVE,
 IGNORE,

Review Comment:
   you can add a note about the order adjustment 



##
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/metrics/SchemaOperatorMetrics.java:
##
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions 

[PR] [FLINK-35884] MySQL pipeline support snapshot chunk key-column [flink-cdc]

2024-07-23 Thread via GitHub


beryllw opened a new pull request, #3490:
URL: https://github.com/apache/flink-cdc/pull/3490

   flink-connector-mysql-cdc `MySqlSourceOptions` supports specifying the 
parameter scan.incremental.snapshot.chunk.key-column to divide chunks, pipeline 
connector should also support.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35868][cdc-connector][mongodb] Bump dependency version to support MongoDB 7.0 [flink-cdc]

2024-07-23 Thread via GitHub


yuxiqian commented on PR #3489:
URL: https://github.com/apache/flink-cdc/pull/3489#issuecomment-2245079611

   I'll investigate this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]

2024-07-23 Thread via GitHub


ferenc-csaky commented on code in PR #821:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1687799832


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java:
##
@@ -426,6 +438,14 @@ public static String operatorConfigKey(String key) {
 .withDescription(
 "Max allowed checkpoint age for initiating 
last-state upgrades on running jobs. If a checkpoint is not available within 
the desired age (and nothing in progress) a savepoint will be triggered.");
 
+@Documentation.Section(SECTION_DYNAMIC)
+public static final ConfigOption 
OPERATOR_JOB_SAVEPOINT_DISPOSE_ON_DELETE =
+operatorConfig("savepoint.dispose-on-delete")
+.booleanType()
+.defaultValue(false)
+.withDescription(
+"Savepoint data for FlinkStateSnapshot resources 
created by the operator during upgrades and periodic savepoints will be 
disposed of automatically when the generated Kubernetes resource is deleted.");

Review Comment:
   nit: "...savepoints will be disposed ~of~ automatically when..."



##
flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkStateSnapshotState.java:
##
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.api.status;
+
+import org.apache.flink.annotation.Experimental;
+
+/** Describes current snapshot state. */
+@Experimental
+public enum FlinkStateSnapshotState {

Review Comment:
   Maybe move this inside the `FlinkStateSnapshotStatus`, and then the enum 
name could be simply `State`. This might improve the overall readability and we 
can work around the kind of confusing wording here a bit better.
   
   I see that `ReconciliationStatus` and `ReconciliationState` follows the 
current setup and this just setup the same way, but the wording is 
straightforward there.



##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java:
##
@@ -297,6 +297,18 @@ public static String operatorConfigKey(String key) {
 "Custom HTTP header for HttpArtifactFetcher. The 
header will be applied when getting the session job artifacts. "
 + "Expected format: 
headerKey1:headerValue1,headerKey2:headerValue2.");
 
+@Documentation.Section(SECTION_DYNAMIC)
+public static final ConfigOption SNAPSHOT_RESOURCE_ENABLED =
+operatorConfig("snapshot.resource.enabled")
+.booleanType()
+.defaultValue(true)
+.withDescription(
+"Create new FlinkStateSnapshot resources for 
storing snapshots. "
++ "Disable if you wish to use the 
deprecated mode and save snapshot results to "
++ "FlinkDeployment/FlinkSessionJob status 
fields. The Operator will fallback to the "

Review Comment:
   nit: "The Operator will fallback to ~the~"



##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotController.java:
##
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+

Re: [PR] [FLINK-35868][cdc-connector][mongodb] Bump dependency version to support MongoDB 7.0 [flink-cdc]

2024-07-23 Thread via GitHub


leonardBang commented on code in PR #3489:
URL: https://github.com/apache/flink-cdc/pull/3489#discussion_r1687920162


##
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/pom.xml:
##
@@ -69,7 +69,7 @@ limitations under the License.
 
 org.mongodb
 mongodb-driver-sync
-4.9.1
+5.1.1

Review Comment:
   Could you also update the docs ? and the older(release-3.1) version docs 
seem use an incorrect driver version.
   
https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/overview/



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35272][cdc][runtime] Pipeline Transform job supports omitting / renaming calculation column [flink-cdc]

2024-07-23 Thread via GitHub


leonardBang commented on code in PR #3285:
URL: https://github.com/apache/flink-cdc/pull/3285#discussion_r1687881675


##
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/ddl/data_types_test.sql:
##
@@ -0,0 +1,28 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--  http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+DROP TABLE IF EXISTS DATA_TYPES_TABLE;
+
+CREATE TABLE DATA_TYPES_TABLE (
+ID INT NOT NULL,
+TS DATETIME(0),
+NUM DECIMAL(10, 2),
+PRIMARY KEY (ID)
+);

Review Comment:
   The type is not so complex from my point, could you refer `fullTypesTest` of 
MySQL CDC Source?



##
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java:
##
@@ -56,6 +67,181 @@ public static List 
createFieldGetters(List colum
 return fieldGetters;
 }
 
+/** Restore original data fields from RecordData structure. */
+public static List restoreOriginalData(
+@Nullable RecordData recordData, List 
fieldGetters) {
+if (recordData == null) {
+return Collections.emptyList();
+}
+List actualFields = new ArrayList<>();
+for (RecordData.FieldGetter fieldGetter : fieldGetters) {
+actualFields.add(fieldGetter.getFieldOrNull(recordData));
+}
+return actualFields;
+}
+
+/** Merge compatible upstream schemas. */
+public static Schema mergeCompatibleSchemas(List schemas) {
+if (schemas.isEmpty()) {
+return null;
+} else if (schemas.size() == 1) {
+return schemas.get(0);
+} else {
+Schema outputSchema = null;
+for (Schema schema : schemas) {
+outputSchema = mergeSchema(outputSchema, schema);
+}
+return outputSchema;
+}
+}
+
+/** Try to combine two schemas with potential incompatible type. */
+@VisibleForTesting
+public static Schema mergeSchema(@Nullable Schema lSchema, Schema rSchema) 
{
+if (lSchema == null) {
+return rSchema;
+}
+if (lSchema.getColumnCount() != rSchema.getColumnCount()) {
+throw new IllegalStateException(
+String.format(
+"Unable to merge schema %s and %s with different 
column counts.",
+lSchema, rSchema));
+}
+if (!lSchema.primaryKeys().equals(rSchema.primaryKeys())) {
+throw new IllegalStateException(
+String.format(
+"Unable to merge schema %s and %s with different 
primary keys.",
+lSchema, rSchema));
+}
+if (!lSchema.partitionKeys().equals(rSchema.partitionKeys())) {
+throw new IllegalStateException(
+String.format(
+"Unable to merge schema %s and %s with different 
partition keys.",
+lSchema, rSchema));
+}
+if (!lSchema.options().equals(rSchema.options())) {
+throw new IllegalStateException(
+String.format(
+"Unable to merge schema %s and %s with different 
options.",
+lSchema, rSchema));
+}
+if (!Objects.equals(lSchema.comment(), rSchema.comment())) {
+throw new IllegalStateException(
+String.format(
+"Unable to merge schema %s and %s with different 
comments.",
+lSchema, rSchema));
+}
+
+List leftColumns = lSchema.getColumns();
+List rightColumns = rSchema.getColumns();
+
+List mergedColumns =
+IntStream.range(0, lSchema.getColumnCount())
+.mapToObj(i -> mergeColumn(leftColumns.get(i), 
rightColumns.get(i)))
+.collect(Collectors.toList());
+
+return lSchema.copy(mergedColumns);
+}
+
+/** Try to combine two columns with potential incompatible type. */
+@VisibleForTesting
+public static Column mergeColumn(Column lColumn, Column rColumn) {
+if (!Objects.equals(lColumn.getName(), rColumn.getName())) {
+

Re: [PR] [FLINK-33874][runtime] Support resource request wait mechanism at DefaultDeclarativeSlotPool side for Default Scheduler [flink]

2024-07-23 Thread via GitHub


flinkbot commented on PR #25113:
URL: https://github.com/apache/flink/pull/25113#issuecomment-2244834549

   
   ## CI report:
   
   * 7e539f5fc908f38eebba16f9480bcf3f5d5009c3 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-33874][runtime] Support resource request wait mechanism at DefaultDeclarativeSlotPool side for Default Scheduler [flink]

2024-07-23 Thread via GitHub


RocMarshal opened a new pull request, #25113:
URL: https://github.com/apache/flink/pull/25113

   
   
   
   ## What is the purpose of the change
   
   Support resource request wait mechanism at DefaultDeclarativeSlotPool side 
for Default Scheduler
   
   
   ## Brief change log
   
   - introduce slot.request.max-interval 
   - implement the mechanism.
   
   
   ## Verifying this change
   
   
   This change is already covered by added corresponding test cases.
   
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager 
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (not applicable / **docs** / 
JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35874][cdc-connector][mysql] Check pureBinlogPhaseTables set before call getBinlogPosition method [flink-cdc]

2024-07-23 Thread via GitHub


qiaozongmi commented on PR #3488:
URL: https://github.com/apache/flink-cdc/pull/3488#issuecomment-2244820245

   @leonardBang PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35877] Shade protobuf in flink [flink]

2024-07-23 Thread via GitHub


zhuanshenbsj1 commented on PR #25112:
URL: https://github.com/apache/flink/pull/25112#issuecomment-2244784197

   > Which part of the Flink runtime/system relies on Protobuf, that would 
justify shading Protobuf?
   
   This is mainly to avoid the dependency conflict between the protobuf used in 
the developer's code and flink itself.  protobuf is very commonly used in 
development.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2024-07-23 Thread via GitHub


1996fanrui commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1687687114


##
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategyTest.java:
##
@@ -69,46 +69,29 @@ class LocalInputPreferredSlotSharingStrategyTest {
 private TestingSchedulingExecutionVertex ev21;
 private TestingSchedulingExecutionVertex ev22;
 
-private Set slotSharingGroups;
-
-@BeforeEach
-void setUp() {
-topology = new TestingSchedulingTopology();
-
+private void setupCase() {
 ev11 = topology.newExecutionVertex(JOB_VERTEX_ID_1, 0);
 ev12 = topology.newExecutionVertex(JOB_VERTEX_ID_1, 1);
 
 ev21 = topology.newExecutionVertex(JOB_VERTEX_ID_2, 0);
 ev22 = topology.newExecutionVertex(JOB_VERTEX_ID_2, 1);
 
-final SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
-slotSharingGroup.addVertexToGroup(JOB_VERTEX_ID_1);
-slotSharingGroup.addVertexToGroup(JOB_VERTEX_ID_2);
-slotSharingGroups = Collections.singleton(slotSharingGroup);
+slotsharingGroup.addVertexToGroup(JOB_VERTEX_ID_1);
+slotsharingGroup.addVertexToGroup(JOB_VERTEX_ID_2);
 }
 
-@Test
-void testCoLocationConstraintIsRespected() {
-topology.connect(ev11, ev22);
-topology.connect(ev12, ev21);
-
-final CoLocationGroup coLocationGroup =
-new TestingCoLocationGroup(JOB_VERTEX_ID_1, JOB_VERTEX_ID_2);
-final Set coLocationGroups = 
Collections.singleton(coLocationGroup);
-
-final SlotSharingStrategy strategy =
-new LocalInputPreferredSlotSharingStrategy(
-topology, slotSharingGroups, coLocationGroups);
-
-assertThat(strategy.getExecutionSlotSharingGroups()).hasSize(2);
-
assertThat(strategy.getExecutionSlotSharingGroup(ev11.getId()).getExecutionVertexIds())
-.contains(ev11.getId(), ev21.getId());
-
assertThat(strategy.getExecutionSlotSharingGroup(ev12.getId()).getExecutionVertexIds())
-.contains(ev12.getId(), ev22.getId());
+@Override
+protected SlotSharingStrategy getSlotSharingStrategy(
+SchedulingTopology topology,
+Set slotSharingGroups,
+Set coLocationGroups) {
+return new LocalInputPreferredSlotSharingStrategy(
+topology, slotSharingGroups, coLocationGroups);
 }
 
 @Test
 void testInputLocalityIsRespectedWithRescaleEdge() {
+setupCase();

Review Comment:
   It's a little wired to call `setupCase` in too many tests.
   
   Could we define a `protected abstract void doSetup();` in 
`AbstractSlotSharingStrategyTest` and `setUp` call `doSetup()`?



##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategy.java:
##
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This strategy tries to get a balanced tasks scheduling. Execution vertices, 
which are belong to
+ * the same SlotSharingGroup, tend to be put evenly in each 
ExecutionSlotSharingGroup. Co-location
+ * 

Re: [PR] [FLINK-35623] Bump mongo-driver version from 4.7.2 to 5.1.1 to support MongoDB 7.0 [flink-connector-mongodb]

2024-07-23 Thread via GitHub


Jiabao-Sun merged PR #36:
URL: https://github.com/apache/flink-connector-mongodb/pull/36


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35868][cdc-connector][mongodb] Bump dependency version to support MongoDB 7.0 [flink-cdc]

2024-07-23 Thread via GitHub


yuxiqian commented on PR #3489:
URL: https://github.com/apache/flink-cdc/pull/3489#issuecomment-2244705856

   > Shall we bump the driver version from 4.7.1 to 5.1.1 as well?
   
   Done, bumped `mongo-kafka` version, too.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35868][cdc-connector][mongodb] Add MongoDB 6.0 & 7.0 tests [flink-cdc]

2024-07-23 Thread via GitHub


Jiabao-Sun commented on PR #3489:
URL: https://github.com/apache/flink-cdc/pull/3489#issuecomment-2244672599

   Shall we bump the driver version from 4.7.1 to 5.1.1 as well?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35623] Bump mongo-driver version from 4.7.2 to 5.1.1 to support MongoDB 7.0 [flink-connector-mongodb]

2024-07-23 Thread via GitHub


Jiabao-Sun commented on PR #36:
URL: 
https://github.com/apache/flink-connector-mongodb/pull/36#issuecomment-2244667146

   Hi @yux, could you help review this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35737] Prevent Memory Leak by Closing MemoryExecutionGraphInfoStore on MiniCluster Shutdown [flink]

2024-07-23 Thread via GitHub


fengjiajie commented on PR #25009:
URL: https://github.com/apache/flink/pull/25009#issuecomment-2244648203

   Hi @Samrat002, I was wondering if you had a chance to take another look at 
this PR. Please let me know if any further changes are needed. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35877] Shade protobuf in flink [flink]

2024-07-23 Thread via GitHub


flinkbot commented on PR #25112:
URL: https://github.com/apache/flink/pull/25112#issuecomment-2244623653

   
   ## CI report:
   
   * 8a98ca6989b8a1192f72c4d0fd58529e99f9bfd5 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-07-23 Thread via GitHub


hlteoh37 commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1687672586


##
flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsSinkBuilderTest.java:
##
@@ -25,14 +25,6 @@
 /** Covers construction, defaults and sanity checking of {@link 
SqsSinkBuilder}. */
 class SqsSinkBuilderTest {
 
-@Test
-void elementConverterOfSinkMustBeSetWhenBuilt() {
-Assertions.assertThatExceptionOfType(NullPointerException.class)
-.isThrownBy(() -> 
SqsSink.builder().setSqsUrl("sqlUrl").build())
-.withMessageContaining(
-"No SerializationSchema was supplied to the SQS Sink 
builder.");
-}
-

Review Comment:
   Why did we remove this unit test?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-35877] Shade protobuf in flink [flink]

2024-07-23 Thread via GitHub


zhuanshenbsj1 opened a new pull request, #25112:
URL: https://github.com/apache/flink/pull/25112

   
   
   ## What is the purpose of the change
   
   Shade the classes in protobuf to avoid class conflict.
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follow [the 
conventions for tests defined in our code quality 
guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing).
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-35868][mongo] Add MongoDB 6.0 & 7.0 tests [flink-cdc]

2024-07-23 Thread via GitHub


yuxiqian opened a new pull request, #3489:
URL: https://github.com/apache/flink-cdc/pull/3489

   This closes FLINK-35868.
   
   It allows Mongo CDC test cases running on 6.0.16 & 7.0.12 (and legacy 5.0.2).
   
   Notice: since JUnit doesn't allow parameterized `@ClassRule` or static 
fields, so containers must be created / destroyed before / after each test 
case, which significantly slows down the. testing (~50 min.). More discussion 
is required on this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35868][mongo] Add MongoDB 6.0 & 7.0 tests [flink-cdc]

2024-07-23 Thread via GitHub


yuxiqian commented on PR #3489:
URL: https://github.com/apache/flink-cdc/pull/3489#issuecomment-2244493624

   @leonardBang @Jiabao-Sun PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-27355][runtime] Unregister JobManagerRunner after it's closed [flink]

2024-07-23 Thread via GitHub


XComp commented on code in PR #25027:
URL: https://github.com/apache/flink/pull/25027#discussion_r1687470490


##
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java:
##
@@ -83,9 +83,15 @@ public Collection getJobManagerRunners() {
 }
 
 @Override
-public CompletableFuture localCleanupAsync(JobID jobId, Executor 
unusedExecutor) {
+public CompletableFuture localCleanupAsync(JobID jobId, Executor 
mainThreadExecutor) {
 if (isRegistered(jobId)) {
-return unregister(jobId).closeAsync();
+CompletableFuture resultFuture = 
this.jobManagerRunners.get(jobId).closeAsync();
+
+return resultFuture.thenApply(

Review Comment:
   ```suggestion
   return resultFuture.thenApplyAsync(
   ```



##
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/TestingResourceCleanerFactory.java:
##
@@ -30,16 +30,17 @@
 /** {@code TestingResourceCleanerFactory} for adding custom {@link 
ResourceCleaner} creation. */
 public class TestingResourceCleanerFactory implements ResourceCleanerFactory {
 
-private final Collection 
locallyCleanableResources;
+private final Collection
+locallyCleanableInMainThreadResource;
 private final Collection 
globallyCleanableResources;
 
 private final Executor cleanupExecutor;
 
 private TestingResourceCleanerFactory(
-Collection locallyCleanableResources,
+Collection 
locallyCleanableInMainThreadResource,

Review Comment:
   `LocallyCleanableInMainThreadResource` is an implementation detail of the 
`DispatcherResourceCleanerFactory`. We don't have to use it in 
`TestingResourceCleanerFactory`.



##
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java:
##


Review Comment:
   I'm wondering whether you could come up with a test where we verify that the 
close is called in another thread and the unregister is then executed in the 
main thread.



##
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/LocallyCleanableInMainThreadResource.java:
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher.cleanup;
+
+import org.apache.flink.api.common.JobID;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * {@code LocallyCleanableInMainThreadResource} is supposed to be implemented 
by any class that
+ * provides artifacts for a given job that need to be cleaned up in the main 
thread after the job
+ * reached a local terminal state. Local cleanups that needs to be triggered 
for a global terminal
+ * state as well, need to be implemented using the {@link 
GloballyCleanableResource}.
+ *
+ * The {@link DispatcherResourceCleanerFactory} provides a workaround to 
trigger some {@code
+ * LocallyCleanableInMainThreadResource} as globally cleanable. FLINK-26175 is 
created to cover a
+ * refactoring and straighten things out.
+ *
+ * @see org.apache.flink.api.common.JobStatus
+ */
+@FunctionalInterface
+public interface LocallyCleanableInMainThreadResource {
+
+/**
+ * {@code localCleanupAsync} is expected to be called from the main thread 
and uses the passed
+ * {@code mainThreadExecutor} for cleanups. Thread-safety must be ensured.
+ *
+ * @param jobId The {@link JobID} of the job for which the local data 
should be cleaned up.
+ * @param mainThreadExecutor The main thread executor.
+ * @return The cleanup result future.
+ */
+CompletableFuture localCleanupAsync(JobID jobId, Executor 
mainThreadExecutor);

Review Comment:
   We shouldn't remove the `ioExecutor` from the signature because we still 
need to translate `LocallyCleanableInMainThreadResource` into 
`LocallyCleanableResource` (see the proposal in [my 
comment](https://github.com/apache/flink/pull/25027#discussion_r1683903323)).



##
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java:
##
@@ -126,8 +129,24 @@ JOB_MANAGER_METRIC_GROUP_LABEL, 

Re: [PR] [BP-1.20][FLINK-35095][test] Fix unstable tests in `ExecutionEnvironmentImplTest` [flink]

2024-07-22 Thread via GitHub


flinkbot commented on PR #25111:
URL: https://github.com/apache/flink/pull/25111#issuecomment-2244299273

   
   ## CI report:
   
   * 599839b334cb52e6159912ccb2162e314ce38198 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [BP-1.20]][FLINK-35095][test] Fix unstable tests in `ExecutionEnvironmentImplTest` [flink]

2024-07-22 Thread via GitHub


reswqa opened a new pull request, #25111:
URL: https://github.com/apache/flink/pull/25111

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-23598] [core] Fix a bug that caused position to be updated twice when writing a string to a byte array [flink]

2024-07-22 Thread via GitHub


sn-12-3 commented on PR #23563:
URL: https://github.com/apache/flink/pull/23563#issuecomment-2244234183

   Hello, could someone please clarify if this PR requires any further 
modifications as the PR is open from quite some time now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [cdc-common] add field of defaultValue to Column. [flink-cdc]

2024-07-22 Thread via GitHub


leonardBang commented on PR #2944:
URL: https://github.com/apache/flink-cdc/pull/2944#issuecomment-2244218226

   @lvyanquan Could you check the failed tests? and it's recommended to rebase 
master.  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35713][cdc-compose] Add sink PARALLELISM for flink-cdc. [flink-cdc]

2024-07-22 Thread via GitHub


ruanhang1993 commented on code in PR #3438:
URL: https://github.com/apache/flink-cdc/pull/3438#discussion_r1687392535


##
flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java:
##
@@ -93,6 +93,7 @@ private FlinkPipelineComposer(StreamExecutionEnvironment env, 
boolean isBlocking
 @Override
 public PipelineExecution compose(PipelineDef pipelineDef) {
 int parallelism = 
pipelineDef.getConfig().get(PipelineOptions.PIPELINE_PARALLELISM);
+int sinkParallelism = 
pipelineDef.getConfig().get(PipelineOptions.SINK_PARALLELISM);

Review Comment:
   What if users do not set the SINK_PARALLELISM setting?



##
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java:
##
@@ -45,6 +45,15 @@ public class PipelineOptions {
 .noDefaultValue()
 .withDescription("Parallelism of the pipeline");
 
+
+
+public static final ConfigOption SINK_PARALLELISM =
+ConfigOptions.key("sink.parallelism")
+.intType()
+.noDefaultValue()
+.withDescription("Parallelism of the sink in the 
pipeline");

Review Comment:
   Please add some tests for the setting. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31931][runtime] Prevent exception history page from linking to non-existent TM page. [flink]

2024-07-22 Thread via GitHub


JunRuiLee closed pull request #22542: [FLINK-31931][runtime] Prevent exception 
history page from linking to non-existent TM page.
URL: https://github.com/apache/flink/pull/22542


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [minor][cdc-connector][oracle] OracleSchema getTableSchema Description Modified [flink-cdc]

2024-07-22 Thread via GitHub


leonardBang merged PR #3443:
URL: https://github.com/apache/flink-cdc/pull/3443


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35766][streaming] Fix hang issue in JobGraph generation when StreamGraph contains many YieldingOperatorFactory. [flink]

2024-07-22 Thread via GitHub


flinkbot commented on PR #25110:
URL: https://github.com/apache/flink/pull/25110#issuecomment-2244169719

   
   ## CI report:
   
   * e83bc90e32841a76be85d96d25e1098e9276c50a UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-35766][streaming] Fix hang issue in JobGraph generation when StreamGraph contains many YieldingOperatorFactory. [flink]

2024-07-22 Thread via GitHub


JunRuiLee opened a new pull request, #25110:
URL: https://github.com/apache/flink/pull/25110

   
   
   
   
   ## What is the purpose of the change
   
   Fix hang issue in JobGraph generation when StreamGraph contains many 
YieldingOperatorFactory.
   
   
   ## Brief change log
   
   Add a cache to store the mapping between StreamNode and its head operator
   
   
   
   ## Verifying this change
   
   
   
   This change added tests and can be verified by 
StreamingJobGraphGeneratorTest#testJobGraphGenerationWithManyYieldingOperatorsDoesNotHang.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't 
know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35674][cdc-connector][mysql]Fix blocking caused by searching for timestamp in binlog file [flink-cdc]

2024-07-22 Thread via GitHub


shiyiky commented on code in PR #3432:
URL: https://github.com/apache/flink-cdc/pull/3432#discussion_r1687363640


##
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java:
##
@@ -240,12 +240,15 @@ private static Map querySystemVariables(
 return variables;
 }
 
-public static BinlogOffset findBinlogOffset(long targetMs, MySqlConnection 
connection) {
+public static BinlogOffset findBinlogOffset(
+long targetMs, MySqlConnection connection, MySqlSourceConfig 
mySqlSourceConfig) {

Review Comment:
   At first, I did think so, but considering that this is a method inside the 
util attribute, there may be some changes later, and the attributes carried by 
the config class will create more in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35674][cdc-connector][mysql]Fix blocking caused by searching for timestamp in binlog file [flink-cdc]

2024-07-22 Thread via GitHub


whhe commented on code in PR #3432:
URL: https://github.com/apache/flink-cdc/pull/3432#discussion_r1687359553


##
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java:
##
@@ -240,12 +240,15 @@ private static Map querySystemVariables(
 return variables;
 }
 
-public static BinlogOffset findBinlogOffset(long targetMs, MySqlConnection 
connection) {
+public static BinlogOffset findBinlogOffset(
+long targetMs, MySqlConnection connection, MySqlSourceConfig 
mySqlSourceConfig) {

Review Comment:
   How about passing the serverId directly? In this way the method signature is 
also clearer and easier to understand.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35865][base] Support Byte and Short in ObjectUtils [flink-cdc]

2024-07-22 Thread via GitHub


GOODBOY008 merged PR #3481:
URL: https://github.com/apache/flink-cdc/pull/3481


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [cdc-runtime]SchemaRegistry Log output is repeated [flink-cdc]

2024-07-22 Thread via GitHub


ChengJie1053 commented on PR #3487:
URL: https://github.com/apache/flink-cdc/pull/3487#issuecomment-2244140620

   @PatrickRen  Hi, can you take a look at this code for me
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35674][cdc-connector][mysql]Fix blocking caused by searching for timestamp in binlog file [flink-cdc]

2024-07-22 Thread via GitHub


whhe commented on PR #3432:
URL: https://github.com/apache/flink-cdc/pull/3432#issuecomment-2244137919

   > May be we should consider fix the BinaryLogClient with LifecycleListener  
in a new pr ,because another pr also should be fixed 
https://github.com/apache/flink-cdc/pull/1915/
   
   +1, creating a new pr for it makes sense to me.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35363] FLIP-449: Reorganization of flink-connector-jdbc [flink-connector-jdbc]

2024-07-22 Thread via GitHub


RocMarshal commented on PR #123:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/123#issuecomment-2244122538

   > Note: Before the merge, we can consider removing the 
backward-compatibility module (and workflow) to avoid generating the jar, as it 
was only meant to be a helper.
   
   Hi, @eskabetxe 
   Considering that this is a relatively large refactoring action,
   Perhaps retaining this module can also help developers quickly verify and 
fix compatibility related issues.
   How about continuing to keep one or two versions of the cycle? In other 
words, keep this module until the refactored code becomes stable and mature.
   Please let me know what's your opinion.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix][docs] Add missing "Submitting a Flink job" section in Chinese version doc and other fixes [flink]

2024-07-22 Thread via GitHub


showuon commented on PR #24952:
URL: https://github.com/apache/flink/pull/24952#issuecomment-2244119347

   The CI test passed. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix] [docs] Fix java doc api [flink]

2024-07-22 Thread via GitHub


reswqa merged PR #25101:
URL: https://github.com/apache/flink/pull/25101


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix] [base] Fix the serialization error occurring with table names containing dot [flink-cdc]

2024-07-22 Thread via GitHub


leonardBang commented on code in PR #2705:
URL: https://github.com/apache/flink-cdc/pull/2705#discussion_r1687318951


##
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/split/SourceSplitSerializer.java:
##
@@ -199,7 +199,7 @@ public static void writeTableSchemas(
 boolean useCatalogBeforeSchema =
 
SerializerUtils.shouldUseCatalogBeforeSchema(entry.getKey());
 out.writeBoolean(useCatalogBeforeSchema);
-out.writeUTF(entry.getKey().toString());
+out.writeUTF(entry.getKey().toDoubleQuotedString());

Review Comment:
   @edmond-kk @loserwang1024 Is this a compatible change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [cdc-runtime]SchemaRegistry Log output is repeated [flink-cdc]

2024-07-22 Thread via GitHub


ChengJie1053 opened a new pull request, #3487:
URL: https://github.com/apache/flink-cdc/pull/3487

   
![image](https://github.com/user-attachments/assets/0d8bbbf7-98d1-4ca2-896b-f0f27bc074ff)
   
   
![image](https://github.com/user-attachments/assets/60ba14d1-09e2-4cb8-b648-a857c76600c3)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35674][cdc-connector][mysql]Fix blocking caused by searching for timestamp in binlog file [flink-cdc]

2024-07-22 Thread via GitHub


shiyiky commented on PR #3432:
URL: https://github.com/apache/flink-cdc/pull/3432#issuecomment-2244093727

   May be we should consider fix the BinaryLogClient with LifecycleListener  in 
a new pr ,because another pr also should be fixed 
https://github.com/apache/flink-cdc/pull/1915/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35674][cdc-connector][mysql]Fix blocking caused by searching for timestamp in binlog file [flink-cdc]

2024-07-22 Thread via GitHub


shiyiky commented on PR #3432:
URL: https://github.com/apache/flink-cdc/pull/3432#issuecomment-2244085865

   > I think adding a server id can indeed solve the issue, and a 
LifecycleListener is also needed for other exceptions. Can you add a basic 
implementation like `ReaderThreadLifecycleListener` please?
   
   where i can add a basic implementation like `ReaderThreadLifecycleListener` ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix] [base] Fix the serialization error occurring with table names containing dot [flink-cdc]

2024-07-22 Thread via GitHub


github-actions[bot] commented on PR #2705:
URL: https://github.com/apache/flink-cdc/pull/2705#issuecomment-2244013184

   This pull request has been automatically marked as stale because it has not 
had recent activity for 60 days. It will be closed in 30 days if no further 
activity occurs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31362][table] Upgrade Calcite version to 1.33.0 [flink]

2024-07-22 Thread via GitHub


snuyanzin commented on PR #24255:
URL: https://github.com/apache/flink/pull/24255#issuecomment-2243935706

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]

2024-07-22 Thread via GitHub


ferenc-csaky commented on code in PR #821:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1686812531


##
flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/CheckpointSpec.java:
##
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.api.spec;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.kubernetes.operator.api.status.CheckpointType;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/** Spec for checkpoint state snapshots. */
+@Experimental
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+@Builder
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class CheckpointSpec {
+/** Type of checkpoint to take. */
+private CheckpointType checkpointType = CheckpointType.FULL;

Review Comment:
   Due to [FLINK-33723](https://issues.apache.org/jira/browse/FLINK-33723), 
`INCREMENTAL` checkpoint triggering via REST API is disabled in Flink 1.19, so 
I think we may need to handle this on the operator side as well to not cause 
errors. Versions < 1.19 will work (although the actual checkpoint semantics 
might be undesired, see the linked jira for deatails).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]

2024-07-22 Thread via GitHub


ferenc-csaky commented on code in PR #821:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1684358470


##
flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobSpec.java:
##
@@ -90,8 +100,8 @@ public class JobSpec implements Diffable {
 
 /**
  * Nonce used to trigger a full redeployment of the job from the savepoint 
path specified in
- * initialSavepointPath. In order to trigger redeployment, change the 
number to a different
- * non-null value. Rollback is not possible after redeployment.
+ * initialSavepointPath or initialSavepointName. In order to trigger 
redeployment, change the

Review Comment:
   I might lack context here, but three is no `initialSavepointName` field in 
the whole codebase other than this comment and the generated doc from it.



##
flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/CheckpointSpec.java:
##
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.api.spec;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.kubernetes.operator.api.status.CheckpointType;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/** Spec for checkpoint state snapshots. */
+@Experimental
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+@Builder
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class CheckpointSpec {
+/** Type of checkpoint to take. */
+private CheckpointType checkpointType = CheckpointType.FULL;

Review Comment:
   Due to [FLINK-33723](https://issues.apache.org/jira/browse/FLINK-33723), 
`INCREMENTAL` checkpoint triggering is disabled in Flink 1.19, so I think we 
may need to handle this on the operator side as well to not cause errors. 
Versions < 1.19 will work (although the actual checkpoint semantics might be 
undesired, see the linked jira for deatails).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35267][snapshot] FlinkStateSnapshot documentation and examples [flink-kubernetes-operator]

2024-07-22 Thread via GitHub


ferenc-csaky commented on code in PR #854:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/854#discussion_r1686476104


##
docs/content/docs/custom-resource/snapshots.md:
##
@@ -0,0 +1,117 @@
+---
+title: "Snapshots"
+weight: 3
+type: docs
+aliases:
+  - /custom-resource/snapshots.html
+---
+
+
+# Snapshots
+
+To create, list and delete snapshots you can use the custom resource called 
FlinkStateSnapshot. The operator will use the same controller flow as in the 
case of FlinkDeployment and FlinkSessionJob to trigger the savepoint/checkpoint 
and observe its status.
+
+This feature deprecates the old `savepointInfo` and `checkpointInfo` fields 
found in the Flink resource CR status, alongside with spec fields 
`initialSavepointPath`, `savepointTriggerNonce` and `checkpointTriggerNonce`. 
It is enabled by default by the configuration option 
`kubernetes.operator.snapshot.resource.enabled`. You can turn it off and the 
operator will keep using the deprecated status fields to track snapshots.

Review Comment:
   nit: As a rule of thumb, IMO it is a good practice to start every sentence 
in a new line. MD will render it into 1 paragraph and it keeps text in a more 
readable format in most cases. If there are a really long sentence, probably it 
would worth to consider rewording it anyways, so it can be a good indicator for 
that too. :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-27355][runtime] Unregister JobManagerRunner after it's closed [flink]

2024-07-22 Thread via GitHub


kumar-mallikarjuna commented on code in PR #25027:
URL: https://github.com/apache/flink/pull/25027#discussion_r1686805757


##
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java:
##
@@ -97,6 +106,14 @@ public JobManagerRunner unregister(JobID jobId) {
 return this.jobManagerRunners.remove(jobId);
 }
 
+public void setMainThreadExecutor(ComponentMainThreadExecutor executor) {
+mainThreadExecutor = executor;
+}
+
+public ComponentMainThreadExecutor getMainThreadExecutor() {
+return mainThreadExecutor;
+}
+

Review Comment:
   Done, LMK if you still want to collapse `DefaultJobManagerRunnerRegistry` 
and `OnMainThreadJobManagerRunnerRegistry`.



##
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java:
##
@@ -97,6 +106,14 @@ public JobManagerRunner unregister(JobID jobId) {
 return this.jobManagerRunners.remove(jobId);
 }
 
+public void setMainThreadExecutor(ComponentMainThreadExecutor executor) {
+mainThreadExecutor = executor;
+}
+
+public ComponentMainThreadExecutor getMainThreadExecutor() {
+return mainThreadExecutor;
+}
+

Review Comment:
   Incorporated. LMK if you still want to collapse 
`DefaultJobManagerRunnerRegistry` and `OnMainThreadJobManagerRunnerRegistry`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-27355][runtime] Unregister JobManagerRunner after it's closed [flink]

2024-07-22 Thread via GitHub


kumar-mallikarjuna commented on code in PR #25027:
URL: https://github.com/apache/flink/pull/25027#discussion_r1686804638


##
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java:
##
@@ -164,7 +164,8 @@ void testFailingLocalCleanup() {
 .hasExactlyElementsOfTypes(ExecutionException.class, 
expectedException.getClass())
 .last()
 .isEqualTo(expectedException);
-
assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())).isFalse();
+// Since the cleanup failed, the JobManagerRunner is expected to not 
have been unregistered.

Review Comment:
   Done, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-21373] Add RabbitMQ SinkV2 Implementation, Port Flink version to Flink 1.19 [flink-connector-rabbitmq]

2024-07-22 Thread via GitHub


rjbaucells commented on PR #29:
URL: 
https://github.com/apache/flink-connector-rabbitmq/pull/29#issuecomment-2243290547

   Hello,
   
   What is the status of this PR? 
   
   Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31215] [autoscaler] Backpropagate processing rate limits from non-scalable bottlenecks to upstream operators [flink-kubernetes-operator]

2024-07-22 Thread via GitHub


mxm commented on PR #847:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/847#issuecomment-2243227124

   > The `actual_target_data_rate_join` still can be greater than 
`target_data_rate_upstream_1` or `target_data_rate_upstream_2`. It means that 
the upstreams' target_data_rate remains unchanged.
   > 
   > Also, the `actual_target_data_rate_join` can be less than the 
target_data_rate of upstream, making them equal to 
`actual_target_data_rate_join`. But then the target_data_rate of the join will 
be two times greater than it was expected.
   > 
   > In both cases, the upstream_1 and upstream_2 operators will remain blocked 
after scaling. This is why the simpler approach may not be good enough.
   
   I think it can work if we apply the same logic that we used to determine 
`target_data_rate_join`. As you pointed out, we determined the taget data rate 
via:
   
   `actual_data_rate_join = actual_data_rate_upstream_1 + 
actual_data_rate_upstream_2`
   
   Consequently, we would need to satisfy the following equation for the 
backpropagation:
   
   `target_data_rate_join = target_data_rate_upstream_1 + 
target_data_rate_upstream_2`
   
   That would mean that each input vertex gets the following limit applied:
   
   `actual_data_rate_upstream_i = target_data_rate_upstream_i - 
(target_data_rate_join - actual_target_data_rate_join) / N` 
   
   where `N` is the number of inputs.
   
   Do you think that would work? The benefit of this approach is that we 
leverage all the available information without having to add and backfeed 
additional factors.
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-27355][runtime] Unregister JobManagerRunner after it's closed [flink]

2024-07-22 Thread via GitHub


kumar-mallikarjuna commented on code in PR #25027:
URL: https://github.com/apache/flink/pull/25027#discussion_r1686721979


##
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java:
##
@@ -97,6 +106,14 @@ public JobManagerRunner unregister(JobID jobId) {
 return this.jobManagerRunners.remove(jobId);
 }
 
+public void setMainThreadExecutor(ComponentMainThreadExecutor executor) {
+mainThreadExecutor = executor;
+}
+
+public ComponentMainThreadExecutor getMainThreadExecutor() {
+return mainThreadExecutor;
+}
+

Review Comment:
   Ah I see! I was hesitant about this because that effectively means the 
`cleanupExecutor` for a `JobManagerRegistry` is always the 
`mainThreadExecutor`. But since `JobManagerRegistry`'s current implementations 
don't use the `cleanupExecutor` anyway so it's fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35674][cdc-connector][mysql]Fix blocking caused by searching for timestamp in binlog file [flink-cdc]

2024-07-22 Thread via GitHub


whhe commented on PR #3432:
URL: https://github.com/apache/flink-cdc/pull/3432#issuecomment-2242931869

   I think adding a server id can indeed solve the issue, and a 
LifecycleListener is also needed for other exceptions. Can you add a basic 
implementation like `ReaderThreadLifecycleListener` please?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-35873][cdc-connector][paimon] Add HashFunctionProvider implementation for PaimonDataSink. [flink-cdc]

2024-07-22 Thread via GitHub


lvyanquan opened a new pull request, #3486:
URL: https://github.com/apache/flink-cdc/pull/3486

   A followup of https://github.com/apache/flink-cdc/pull/3414.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35872][table] Fix the incorrect partition generation during period refresh in Full Mode [flink]

2024-07-22 Thread via GitHub


lsyldliu closed pull request #25108: [FLINK-35872][table] Fix the incorrect 
partition generation during period refresh in Full Mode
URL: https://github.com/apache/flink/pull/25108


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34877][cdc] Flink CDC pipeline transform supports type conversion [flink-cdc]

2024-07-22 Thread via GitHub


leonardBang commented on PR #3357:
URL: https://github.com/apache/flink-cdc/pull/3357#issuecomment-2242770101

   > Is it possible to add actual values conversion checks besides existing 
syntax parsing tests? And it should cover cases where `NULL` value is passed as 
conversion argument.
   
   @aiwenmo I think @yuxiqian's idea is reasonable, could you add more value 
tests ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-27355][runtime] Unregister JobManagerRunner after it's closed [flink]

2024-07-22 Thread via GitHub


XComp commented on code in PR #25027:
URL: https://github.com/apache/flink/pull/25027#discussion_r1686411653


##
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java:
##
@@ -97,6 +106,14 @@ public JobManagerRunner unregister(JobID jobId) {
 return this.jobManagerRunners.remove(jobId);
 }
 
+public void setMainThreadExecutor(ComponentMainThreadExecutor executor) {
+mainThreadExecutor = executor;
+}
+
+public ComponentMainThreadExecutor getMainThreadExecutor() {
+return mainThreadExecutor;
+}
+

Review Comment:
   JobManagerRunnerRegistry would implement 
`LocallyCleanableInMainThreadResource` instead of `LocallyCleanableResource`. 
The conversion of the `LocallyCleanableInMainThreadResource#localCleanupAsync` 
callback to the `LocallyCleanableResource#localCleanupAsync` callback would 
then happen in 
`DispatcherResourceCleanerFactory#create[Loc|Glob]alResourceCleaner`.



##
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java:
##
@@ -97,6 +106,14 @@ public JobManagerRunner unregister(JobID jobId) {
 return this.jobManagerRunners.remove(jobId);
 }
 
+public void setMainThreadExecutor(ComponentMainThreadExecutor executor) {
+mainThreadExecutor = executor;
+}
+
+public ComponentMainThreadExecutor getMainThreadExecutor() {
+return mainThreadExecutor;
+}
+

Review Comment:
   `JobManagerRunnerRegistry` would implement 
`LocallyCleanableInMainThreadResource` instead of `LocallyCleanableResource`. 
The conversion of the `LocallyCleanableInMainThreadResource#localCleanupAsync` 
callback to the `LocallyCleanableResource#localCleanupAsync` callback would 
then happen in 
`DispatcherResourceCleanerFactory#create[Loc|Glob]alResourceCleaner`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35864][Table SQL / API] Add CONV function [flink]

2024-07-22 Thread via GitHub


flinkbot commented on PR #25109:
URL: https://github.com/apache/flink/pull/25109#issuecomment-2242532958

   
   ## CI report:
   
   * fc3fe3cbe67d160f08dec878039f66d42a91b29d UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-35864][Table SQL / API] Add CONV function [flink]

2024-07-22 Thread via GitHub


dylanhz opened a new pull request, #25109:
URL: https://github.com/apache/flink/pull/25109

   
   
   ## What is the purpose of the change
   
   Add CONV function.
   Examples:
   ```SQL
   > SELECT CONV(7, 10, 2);
   111
   ```
   
   ## Brief change log
   
   [FLINK-35864](https://issues.apache.org/jira/browse/FLINK-35864)
   
   **This implementation is slightly different from the original Hive version. 
(BaseConversionUtils line 190)**
   
   
   ## Verifying this change
   
   `MathFunctionsITCase#convTestCases`
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (docs)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32501][table-planner] Fix wrong plan of a proctime window aggregation generated due to incorrect cost evaluation [flink]

2024-07-22 Thread via GitHub


Myasuka commented on PR #22964:
URL: https://github.com/apache/flink/pull/22964#issuecomment-2242408382

   It seems this backport was not merged in release-1.17 branch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35318][table-planner] use UTC timezone to handle TIMESTAMP_WITHOUT_TIME_ZONE type in RexNodeToExpressionConverter [flink]

2024-07-22 Thread via GitHub


lshangq commented on PR #24787:
URL: https://github.com/apache/flink/pull/24787#issuecomment-2242267464

   @leonardBang  Could you please take a look when you have time?
   cherry-pick to release-1.19 https://github.com/apache/flink/pull/25093
   
   cherry-pick to release-1.20 https://github.com/apache/flink/pull/25094


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35674][cdc-connector][mysql]Fix blocking caused by searching for timestamp in binlog file [flink-cdc]

2024-07-22 Thread via GitHub


shiyiky commented on PR #3432:
URL: https://github.com/apache/flink-cdc/pull/3432#issuecomment-2242253725

   @whhe PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35872][table] Fix the incorrect partition generation during period refresh in Full Mode [flink]

2024-07-22 Thread via GitHub


hackergin commented on PR #25108:
URL: https://github.com/apache/flink/pull/25108#issuecomment-2242204010

   @lsyldliu  Thank you for reviewing. I have addressed the related comments;  
please have a look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35865][base] Support Byte and Short in ObjectUtils [flink-cdc]

2024-07-21 Thread via GitHub


GOODBOY008 commented on PR #3481:
URL: https://github.com/apache/flink-cdc/pull/3481#issuecomment-2242135166

   @lvyanquan PTAL
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35871][doc] add "snapshot" to mysql connector startup options. [flink-cdc]

2024-07-21 Thread via GitHub


leonardBang merged PR #3484:
URL: https://github.com/apache/flink-cdc/pull/3484


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [minor][cdc-connector][mysql] catch all exceptions to display the table info [flink-cdc]

2024-07-21 Thread via GitHub


lvyanquan opened a new pull request, #3485:
URL: https://github.com/apache/flink-cdc/pull/3485

   parseDDL doesn't always throw ParsingException, and table info will be 
missed in this case.
   
![image](https://github.com/user-attachments/assets/5d053c50-3b80-4a9b-9994-a86a8c9a6be9)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-27355][runtime] Unregister JobManagerRunner after it's closed [flink]

2024-07-21 Thread via GitHub


kumar-mallikarjuna commented on code in PR #25027:
URL: https://github.com/apache/flink/pull/25027#discussion_r1685926520


##
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java:
##
@@ -97,6 +106,14 @@ public JobManagerRunner unregister(JobID jobId) {
 return this.jobManagerRunners.remove(jobId);
 }
 
+public void setMainThreadExecutor(ComponentMainThreadExecutor executor) {
+mainThreadExecutor = executor;
+}
+
+public ComponentMainThreadExecutor getMainThreadExecutor() {
+return mainThreadExecutor;
+}
+

Review Comment:
   > But what we can do is making the main thread passing an implementation 
detail of the `DispatcherResourceCleanerFactory`: We could introduce a twin 
interface `LocallyCleanableWithMainThreadResource`:
   > 
   > ```java
   > @FunctionalInterface
   > public interface LocallyCleanableInMainThreadResource {
   > 
   > CompletableFuture localCleanupAsync(
   > JobID jobId, Executor cleanupExecutor, Executor 
mainThreadExecutor);
   > }
   > ```
   > 
   > That interface can be converted to a `LocallyCleanableResource` in 
`DispatcherResourceCleanerFactory`. We wouldn't have to change the `Dispatcher` 
instantiation in that case. WDYT?
   
   I didn't get this. Which classes would implement 
`LocallyCleanableInMainThreadResource`? Can you elaborate more?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35872][table] Fix the incorrect partition generation during period refresh in Full Mode [flink]

2024-07-21 Thread via GitHub


lsyldliu commented on code in PR #25108:
URL: https://github.com/apache/flink/pull/25108#discussion_r1685926056


##
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManagerTest.java:
##
@@ -99,56 +103,340 @@ void testGenerateInsertStatementWithDynamicOptions() {
 assertThat(actualStatement).isEqualTo(expectedStatement);
 }
 
-@Test
-void testGetPeriodRefreshPartition() {
-String schedulerTime = "2024-01-01 00:00:00";
-Map tableOptions = new HashMap<>();
-tableOptions.put("partition.fields.day.date-formatter", "-MM-dd");
-tableOptions.put("partition.fields.hour.date-formatter", "HH");
-
+@ParameterizedTest(name = "{index}: {0}")
+@MethodSource("testData")
+void testGetPeriodRefreshPartition(TestSpec testSpec) {
 ObjectIdentifier objectIdentifier = ObjectIdentifier.of("catalog", 
"database", "table");
 
-Map actualRefreshPartition =
-MaterializedTableManager.getPeriodRefreshPartition(
-schedulerTime, objectIdentifier, tableOptions, 
ZoneId.systemDefault());
-Map expectedRefreshPartition = new HashMap<>();
-expectedRefreshPartition.put("day", "2024-01-01");
-expectedRefreshPartition.put("hour", "00");
+if (testSpec.errorMessage == null) {
+Map actualRefreshPartition =
+MaterializedTableManager.getPeriodRefreshPartition(
+testSpec.schedulerTime,
+testSpec.freshness,
+objectIdentifier,
+testSpec.tableOptions,
+ZoneId.systemDefault());
 
-assertThat(actualRefreshPartition).isEqualTo(expectedRefreshPartition);
+
assertThat(actualRefreshPartition).isEqualTo(testSpec.expectedRefreshPartition);
+} else {
+assertThatThrownBy(
+() ->
+
MaterializedTableManager.getPeriodRefreshPartition(
+testSpec.schedulerTime,
+testSpec.freshness,
+objectIdentifier,
+testSpec.tableOptions,
+ZoneId.systemDefault()))
+.hasMessage(testSpec.errorMessage);
+}
 }
 
-@Test
-void testGetPeriodRefreshPartitionWithInvalidSchedulerTime() {
-// scheduler time is null
-Map tableOptions = new HashMap<>();
-tableOptions.put("partition.fields.day.date-formatter", "-MM-dd");
-tableOptions.put("partition.fields.hour.date-formatter", "HH");
+static Stream testData() {
+return Stream.of(
+// The interval of freshness match the partition specified by 
the 'date-formatter'.
+TestSpec.create()
+.schedulerTime("2024-01-01 00:00:00")
+.freshness(IntervalFreshness.ofDay("1"))
+.tableOptions("partition.fields.day.date-formatter", 
"-MM-dd")
+.expectedRefreshPartition("day", "2023-12-31"),
+TestSpec.create()
+.schedulerTime("2024-01-02 00:00:00")
+.freshness(IntervalFreshness.ofDay("1"))
+.tableOptions("partition.fields.day.date-formatter", 
"-MM-dd")
+.expectedRefreshPartition("day", "2024-01-01"),
+TestSpec.create()
+.schedulerTime("2024-01-02 00:00:00")
+.freshness(IntervalFreshness.ofHour("1"))
+.tableOptions("partition.fields.day.date-formatter", 
"-MM-dd")
+.tableOptions("partition.fields.hour.date-formatter", 
"HH")
+.expectedRefreshPartition("day", "2024-01-01")
+.expectedRefreshPartition("hour", "23"),
+TestSpec.create()
+.schedulerTime("2024-01-02 01:00:00")
+.freshness(IntervalFreshness.ofHour("1"))
+.tableOptions("partition.fields.day.date-formatter", 
"-MM-dd")
+.tableOptions("partition.fields.hour.date-formatter", 
"HH")
+.expectedRefreshPartition("day", "2024-01-02")
+.expectedRefreshPartition("hour", "00"),
+TestSpec.create()
+.schedulerTime("2024-01-01 00:00:00")
+.freshness(IntervalFreshness.ofHour("2"))
+.tableOptions("partition.fields.day.date-formatter", 
"-MM-dd")
+.tableOptions("partition.fields.hour.date-formatter", 
"HH")
+   

Re: [PR] [FLINK-35872][table] Fix the incorrect partition generation during period refresh in Full Mode [flink]

2024-07-21 Thread via GitHub


lsyldliu commented on code in PR #25108:
URL: https://github.com/apache/flink/pull/25108#discussion_r1685891811


##
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java:
##
@@ -1520,6 +1520,58 @@ void 
testRefreshMaterializedTableWithInvalidParameterInContinuousMode() throws E
 .asSerializableString()));
 }
 
+@Test
+void testMaterializedTableDefinitionQueryContainsTemporaryResources() 
throws Exception {

Review Comment:
   This is an unrelated test with your fix?



##
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java:
##
@@ -1417,7 +1417,7 @@ void 
testPeriodicRefreshMaterializedTableWithPartitionOptions() throws Exception
 sessionHandle,
 materializedTableIdentifier.asSerializableString(),
 true,
-"2024-01-02 00:00:00",
+"2024-01-03 00:00:00",

Review Comment:
   I found this test should be optimized as follow:
   ```
   List data = new ArrayList<>();
   // create materialized table with partition formatter
   createAndVerifyCreateMaterializedTableWithData(
   "my_materialized_table",
   data,
   Collections.singletonMap("ds", "-MM-dd"),
   RefreshMode.FULL);
   
   ObjectIdentifier materializedTableIdentifier =
   ObjectIdentifier.of(
   fileSystemCatalogName, TEST_DEFAULT_DATABASE, 
"my_materialized_table");
   
   // add data to all data list
   data.add(Row.of(1L, 1L, 1L, "2024-01-01"));
   data.add(Row.of(2L, 2L, 2L, "2024-01-02"));
   data.add(Row.of(4L, 4L, 4L, "2024-01-02"));
   
   // refresh the materialized table with period schedule
   long startTime = System.currentTimeMillis();
   OperationHandle periodRefreshTableHandle =
   service.refreshMaterializedTable(
   sessionHandle,
   materializedTableIdentifier.asSerializableString(),
   true,
   "2024-01-03 00:00:00",
   Collections.emptyMap(),
   Collections.emptyMap(),
   Collections.emptyMap());
   ```
   Only then periodic refresh is matched with full mode, not continuous mode.



##
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManagerTest.java:
##
@@ -99,56 +103,340 @@ void testGenerateInsertStatementWithDynamicOptions() {
 assertThat(actualStatement).isEqualTo(expectedStatement);
 }
 
-@Test
-void testGetPeriodRefreshPartition() {
-String schedulerTime = "2024-01-01 00:00:00";
-Map tableOptions = new HashMap<>();
-tableOptions.put("partition.fields.day.date-formatter", "-MM-dd");
-tableOptions.put("partition.fields.hour.date-formatter", "HH");
-
+@ParameterizedTest(name = "{index}: {0}")
+@MethodSource("testData")
+void testGetPeriodRefreshPartition(TestSpec testSpec) {
 ObjectIdentifier objectIdentifier = ObjectIdentifier.of("catalog", 
"database", "table");
 
-Map actualRefreshPartition =
-MaterializedTableManager.getPeriodRefreshPartition(
-schedulerTime, objectIdentifier, tableOptions, 
ZoneId.systemDefault());
-Map expectedRefreshPartition = new HashMap<>();
-expectedRefreshPartition.put("day", "2024-01-01");
-expectedRefreshPartition.put("hour", "00");
+if (testSpec.errorMessage == null) {
+Map actualRefreshPartition =
+MaterializedTableManager.getPeriodRefreshPartition(
+testSpec.schedulerTime,
+testSpec.freshness,
+objectIdentifier,
+testSpec.tableOptions,
+ZoneId.systemDefault());
 
-assertThat(actualRefreshPartition).isEqualTo(expectedRefreshPartition);
+
assertThat(actualRefreshPartition).isEqualTo(testSpec.expectedRefreshPartition);
+} else {
+assertThatThrownBy(
+() ->
+
MaterializedTableManager.getPeriodRefreshPartition(
+testSpec.schedulerTime,
+testSpec.freshness,
+objectIdentifier,
+testSpec.tableOptions,
+ZoneId.systemDefault()))
+

Re: [PR] [FLINK-35237] Allow Sink to Choose HashFunction in PrePartitionOperator [flink-cdc]

2024-07-21 Thread via GitHub


leonardBang merged PR #3414:
URL: https://github.com/apache/flink-cdc/pull/3414


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34944] Use Incremental Source Framework in Flink CDC OceanBase Source Connector [flink-cdc]

2024-07-21 Thread via GitHub


github-actions[bot] commented on PR #3211:
URL: https://github.com/apache/flink-cdc/pull/3211#issuecomment-2241824995

   This pull request has been automatically marked as stale because it has not 
had recent activity for 60 days. It will be closed in 30 days if no further 
activity occurs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-23589][flink-avro] Support microsecond precision for timestamp [flink]

2024-07-21 Thread via GitHub


asantoz commented on PR #19537:
URL: https://github.com/apache/flink/pull/19537#issuecomment-2241809600

   Hey, why this blocked?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35872][table] Fix the incorrect partition generation during period refresh in Full Mode [flink]

2024-07-21 Thread via GitHub


flinkbot commented on PR #25108:
URL: https://github.com/apache/flink/pull/25108#issuecomment-2241624001

   
   ## CI report:
   
   * 6bd73ab3f17dccbecb1113cd0f5aaedec0579f78 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-35872][table] Fix the incorrect partition generation during period refresh in Full Mode [flink]

2024-07-21 Thread via GitHub


hackergin opened a new pull request, #25108:
URL: https://github.com/apache/flink/pull/25108

   ## What is the purpose of the change
   
   *Fix the incorrect partition generation during period refresh in Full Mode*
   
   
   ## Brief change log
   - Fix the incorrect partition generation during period refresh in Full Mode
   
   
   ## Verifying this change
   
   - Add some test case in 
`MaterializedTableManagerTest#testGetPeriodRefreshPartition` to verify the 
change.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (no)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



<    1   2   3   4   5   6   7   8   9   10   >