Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]

2024-07-25 Thread via GitHub


mateczagany commented on code in PR #821:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1691974263


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java:
##
@@ -716,6 +695,48 @@ public CheckpointFetchResult fetchCheckpointInfo(
 }
 }
 
+@Override
+public CheckpointStatsResult fetchCheckpointStats(
+String jobId, Long checkpointId, Configuration conf) {
+try (RestClusterClient clusterClient = getClusterClient(conf)) 
{
+var checkpointStatusHeaders = 
CheckpointStatisticDetailsHeaders.getInstance();
+var parameters = 
checkpointStatusHeaders.getUnresolvedMessageParameters();
+parameters.jobPathParameter.resolve(JobID.fromHexString(jobId));
+
+// This was needed because the parameter is protected
+var checkpointIdPathParameter =
+(CheckpointIdPathParameter) 
Iterables.getLast(parameters.getPathParameters());
+checkpointIdPathParameter.resolve(checkpointId);
+
+var response =
+clusterClient.sendRequest(
+checkpointStatusHeaders, parameters, 
EmptyRequestBody.getInstance());
+
+var stats = response.get();
+if (stats == null) {
+throw new IllegalStateException("Checkpoint ID %d for job %s 
does not exist!");
+} else if (stats instanceof 
CheckpointStatistics.CompletedCheckpointStatistics) {
+return CheckpointStatsResult.completed(
+((CheckpointStatistics.CompletedCheckpointStatistics) 
stats)
+.getExternalPath());
+} else if (stats instanceof 
CheckpointStatistics.FailedCheckpointStatistics) {
+return CheckpointStatsResult.error(
+((CheckpointStatistics.FailedCheckpointStatistics) 
stats)
+.getFailureMessage());
+} else if (stats instanceof 
CheckpointStatistics.PendingCheckpointStatistics) {
+return CheckpointStatsResult.pending();
+} else {
+throw new IllegalArgumentException(
+String.format(
+"Unknown checkpoint statistics result class: 
%s",
+stats.getClass().getSimpleName()));
+}
+} catch (Exception e) {
+LOG.error("Exception while fetching checkpoint statistics", e);

Review Comment:
   This commit made it so we ignore errors when fetching checkpoint stats, 
since at that point we have determined that the checkpoint was successful 
anyways. We just set an empty path: 
https://github.com/apache/flink-kubernetes-operator/pull/821/commits/637b3053b9644b3d6cadd1fb6a05e1f5aab75fa6



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35736][tests] Add migration test scripts & CI workflows [flink-cdc]

2024-07-25 Thread via GitHub


morazow commented on code in PR #3447:
URL: https://github.com/apache/flink-cdc/pull/3447#discussion_r1691956059


##
tools/mig-test/README.md:
##
@@ -0,0 +1,36 @@
+# Flink CDC Migration Test Utilities
+
+## Pipeline Jobs
+### Preparation
+
+1. Install Ruby (macOS has embedded it by default)
+2. (Optional) Run `gem install terminal-table` for better display
+
+### Compile snapshot CDC versions
+3. Set `CDC_SOURCE_HOME` to the root directory of the Flink CDC git repository
+4. Go to `tools/mig-test` and run `ruby prepare_libs.rb` to download released 
/ compile snapshot CDC versions
+
+### Run migration tests
+5. Enter `conf/` and run `docker compose up -d` to start up test containers
+6. Set `FLINK_HOME` to the home directory of Flink
+7. Go back to `tools/mig-test` and run `ruby run_migration_test.rb` to start 
testing
+
+### Result
+The migration result will be displayed in the console like this:
+
+```
+++
+|   Migration Test Result|
++--+---+---+---+--+--+
+|  | 3.0.0 | 3.0.1 | 3.1.0 | 3.1-SNAPSHOT | 3.2-SNAPSHOT |
+| 3.0.0| ❓| ❓| ❌| ✅   | ✅   |
+| 3.0.1|   | ❓| ❌| ✅   | ✅   |
+| 3.1.0|   |   | ✅| ❌   | ❌   |
+| 3.1-SNAPSHOT |   |   |   | ✅   | ✅   |
+| 3.2-SNAPSHOT |   |   |   |  | ✅   |
++--+---+---+---+--+--+
+```

Review Comment:
   Could we add the meaning of `?` here?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]

2024-07-25 Thread via GitHub


mateczagany commented on code in PR #821:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1691962117


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtils.java:
##
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.utils;
+
+import org.apache.flink.autoscaler.utils.DateTimeUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.api.CrdConstants;
+import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
+import org.apache.flink.kubernetes.operator.api.spec.CheckpointSpec;
+import 
org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference;
+import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotSpec;
+import org.apache.flink.kubernetes.operator.api.spec.JobReference;
+import org.apache.flink.kubernetes.operator.api.spec.SavepointSpec;
+import org.apache.flink.kubernetes.operator.api.status.CheckpointType;
+import org.apache.flink.kubernetes.operator.api.status.SavepointFormatType;
+import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.reconciler.SnapshotType;
+
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.apache.commons.lang3.StringUtils;
+
+import javax.annotation.Nullable;
+
+import java.time.Instant;
+import java.util.UUID;
+
+import static 
org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.ABANDONED;
+import static 
org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.COMPLETED;
+import static 
org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.IN_PROGRESS;
+import static 
org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.TRIGGER_PENDING;
+import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.SNAPSHOT_RESOURCE_ENABLED;
+import static 
org.apache.flink.kubernetes.operator.reconciler.SnapshotType.CHECKPOINT;
+import static 
org.apache.flink.kubernetes.operator.reconciler.SnapshotType.SAVEPOINT;
+
+/** Utilities class for FlinkStateSnapshot resources. */
+public class FlinkStateSnapshotUtils {
+
+/**
+ * From a snapshot reference, return its snapshot path. If a {@link 
FlinkStateSnapshot} is
+ * referenced, it will be retrieved from Kubernetes.
+ *
+ * @param kubernetesClient kubernetes client
+ * @param snapshotRef snapshot reference
+ * @return found savepoint path
+ */
+public static String getValidatedFlinkStateSnapshotPath(
+KubernetesClient kubernetesClient, FlinkStateSnapshotReference 
snapshotRef) {
+if (StringUtils.isNotBlank(snapshotRef.getPath())) {
+return snapshotRef.getPath();
+}
+
+if (StringUtils.isBlank(snapshotRef.getName())) {
+throw new IllegalArgumentException(
+String.format("Invalid snapshot name: %s", 
snapshotRef.getName()));
+}
+
+var result =
+snapshotRef.getNamespace() == null
+? kubernetesClient
+.resources(FlinkStateSnapshot.class)
+.withName(snapshotRef.getName())
+.get()
+: kubernetesClient
+.resources(FlinkStateSnapshot.class)
+.inNamespace(snapshotRef.getNamespace())
+.withName(snapshotRef.getName())
+.get();
+
+if (result == null) {
+throw new IllegalStateException(
+String.format(
+

Re: [PR] [FLINK-30481][FLIP-277] GlueCatalog Implementation [flink-connector-aws]

2024-07-25 Thread via GitHub


Samrat002 commented on code in PR #47:
URL: 
https://github.com/apache/flink-connector-aws/pull/47#discussion_r1691963997


##
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/GlueUtils.java:
##
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.FunctionLanguage;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.glue.GlueCatalogOptions;
+import org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.glue.model.Column;
+import software.amazon.awssdk.services.glue.model.Database;
+import software.amazon.awssdk.services.glue.model.GlueResponse;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.UserDefinedFunction;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
+
+/** Utilities related glue Operation. */
+@Internal
+public class GlueUtils {
+
+private static final Logger LOG = LoggerFactory.getLogger(GlueUtils.class);
+
+/**
+ * Glue supports lowercase naming convention.
+ *
+ * @param name fully qualified name.
+ * @return modified name according to glue convention.
+ */
+public static String getGlueConventionalName(String name) {
+
+return name.toLowerCase(Locale.ROOT);
+}
+
+/**
+ * Extract location from database properties if present and remove 
location from properties.
+ * fallback to create default location if not present
+ *
+ * @param databaseProperties database properties.
+ * @param databaseName fully qualified name for database.
+ * @return location for database.
+ */
+public static String extractDatabaseLocation(
+final Map databaseProperties,
+final String databaseName,
+final String catalogPath) {
+if (databaseProperties.containsKey(GlueCatalogConstants.LOCATION_URI)) 
{
+return 
databaseProperties.remove(GlueCatalogConstants.LOCATION_URI);
+} else {
+LOG.info("No location URI Set. Using Catalog Path as default");
+return catalogPath + GlueCatalogConstants.LOCATION_SEPARATOR + 
databaseName;
+}
+}
+
+/**
+ * Extract location from database properties if present and remove 
location from properties.
+ * fallback to create default location if not present
+ *
+ * @param tableProperties table properties.
+ * @param tablePath fully qualified object for table.
+ * @return location for table.
+ */
+public static String extractTableLocation(
+final Map tableProperties,
+final ObjectPath tablePath,
+final String catalogPath) {
+if (tableProperties.containsKey(GlueCatalogConstants.LOCATION_URI)) {
+return tableProperties.remove(GlueCatalogConstants.LOCATION_URI);
+} else {
+return catalogPath
++ GlueCatalogConstants.LOCATION_SEPARATOR
++ tablePath.getDatabaseName()
++ GlueCatalogConstants.LOCATION_SEPARATOR
++ 

Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-07-25 Thread via GitHub


19priyadhingra commented on PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#issuecomment-2251149507

   > @19priyadhingra The tests seems to be failing. Can we please take a look?
   > 
   > Also - it would be good if we squash the commits!
   
   1. Hi Hong, I tried to deep dive more on the only failed test: 
elementConverterWillOpenSerializationSchema
   failed logs: https://paste.amazon.com/show/dhipriya/1721770192
   it complains TestSinkInitContext cant be cast to sink2.Sink$InitContext. I 
understood the reason why it is failing but not sure about how to fix it. This 
issue started coming post 1.19.1 where we made change in TestSinkInitContext. 
One of the easy way could be to remove this test if it is not adding any big 
value or other fix might require change in flink-connector-base where we have 
to keep the old TestSinkInitContext? Right now I removed that test, please let 
me know your thought on this. 
   
[https://github.com/apache/flink/blob/release-1.19.1/flink-connectors/flink-connect[…]pache/flink/connector/base/sink/writer/TestSinkInitContext.java](https://github.com/apache/flink/blob/release-1.19.1/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java#L51)
 -> new TestSinkInitContext implements WriterInitContext whereas old 
TestSinkInitContext (  
[https://github.com/apache/flink/blob/release-1.18.1/flink-connectors/flink-connect[…]pache/flink/connector/base/sink/writer/TestSinkInitContext.java](https://github.com/apache/flink/blob/release-1.18.1/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java#L47))
 implements Sink.InitContext
   Now new one which implements 
[WriterInitContext](https://github.com/apache/flink/blob/release-1.19.1/flink-core/src/main/java/org/apache/flink/api/connector/sink2/WriterInitContext.java#L35)
 extends org.apache.flink.api.connector.sink2.InitContext, not the required one 
org.apache.flink.api.connector.sink2.Sink.InitContext
   
   2. Squashed the commits


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]

2024-07-25 Thread via GitHub


mateczagany commented on code in PR #821:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1691933742


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/snapshot/StateSnapshotReconciler.java:
##
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler.snapshot;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
+import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotSpec;
+import 
org.apache.flink.kubernetes.operator.controller.FlinkStateSnapshotContext;
+import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import 
org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.FlinkStateSnapshotUtils;
+import org.apache.flink.kubernetes.operator.utils.SnapshotUtils;
+import org.apache.flink.util.Preconditions;
+
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import lombok.RequiredArgsConstructor;
+import org.apache.commons.lang3.ObjectUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+
+import static 
org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.TRIGGER_PENDING;
+
+/** The reconciler for the {@link 
org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot}. */
+@RequiredArgsConstructor
+public class StateSnapshotReconciler {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(StateSnapshotReconciler.class);
+
+private final FlinkResourceContextFactory ctxFactory;
+private final EventRecorder eventRecorder;
+
+public void reconcile(FlinkStateSnapshotContext ctx) {
+var resource = ctx.getResource();
+
+var savepointState = resource.getStatus().getState();
+if (!TRIGGER_PENDING.equals(savepointState)) {
+return;
+}
+
+if (resource.getSpec().isSavepoint()
+&& resource.getSpec().getSavepoint().getAlreadyExists()) {
+LOG.info(
+"Snapshot {} is marked as completed in spec, skipping 
triggering savepoint.",
+resource.getMetadata().getName());
+
+FlinkStateSnapshotUtils.snapshotSuccessful(
+resource, resource.getSpec().getSavepoint().getPath(), 
true);
+return;
+}
+
+if (FlinkStateSnapshotUtils.abandonSnapshotIfJobNotRunning(
+ctx.getKubernetesClient(),
+ctx.getResource(),
+ctx.getSecondaryResource().orElse(null),
+eventRecorder)) {
+return;
+}
+
+var jobId = 
ctx.getSecondaryResource().orElseThrow().getStatus().getJobStatus().getJobId();
+
+Optional triggerIdOpt;
+try {
+triggerIdOpt = triggerCheckpointOrSavepoint(resource.getSpec(), 
ctx, jobId);
+} catch (Exception e) {
+LOG.error("Failed to trigger snapshot for resource {}", 
ctx.getResource(), e);
+throw new ReconciliationException(e);
+}
+
+if (triggerIdOpt.isEmpty()) {
+LOG.warn("Failed to trigger snapshot {}", 
resource.getMetadata().getName());
+return;
+}
+
+FlinkStateSnapshotUtils.snapshotInProgress(resource, 
triggerIdOpt.get());
+}
+
+public DeleteControl cleanup(FlinkStateSnapshotContext ctx) throws 
Exception {
+var resource = ctx.getResource();
+var state = resource.getStatus().getState();
+var resourceName = resource.getMetadata().getName();
+LOG.info("Cleaning up resource {}...", resourceName);
+
+if (resource.getSpec().isCheckpoint()) {
+return DeleteControl.defaultDelete();
+}
+if (!resource.getSpec().getSavepoint().getDisposeOnDelete()) {
+return 

Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]

2024-07-25 Thread via GitHub


anupamaggarwal commented on code in PR #821:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1691868012


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtils.java:
##
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.utils;
+
+import org.apache.flink.autoscaler.utils.DateTimeUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.api.CrdConstants;
+import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
+import org.apache.flink.kubernetes.operator.api.spec.CheckpointSpec;
+import 
org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference;
+import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotSpec;
+import org.apache.flink.kubernetes.operator.api.spec.JobReference;
+import org.apache.flink.kubernetes.operator.api.spec.SavepointSpec;
+import org.apache.flink.kubernetes.operator.api.status.CheckpointType;
+import org.apache.flink.kubernetes.operator.api.status.SavepointFormatType;
+import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.reconciler.SnapshotType;
+
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.apache.commons.lang3.StringUtils;
+
+import javax.annotation.Nullable;
+
+import java.time.Instant;
+import java.util.UUID;
+
+import static 
org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.ABANDONED;
+import static 
org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.COMPLETED;
+import static 
org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.IN_PROGRESS;
+import static 
org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.TRIGGER_PENDING;
+import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.SNAPSHOT_RESOURCE_ENABLED;
+import static 
org.apache.flink.kubernetes.operator.reconciler.SnapshotType.CHECKPOINT;
+import static 
org.apache.flink.kubernetes.operator.reconciler.SnapshotType.SAVEPOINT;
+
+/** Utilities class for FlinkStateSnapshot resources. */
+public class FlinkStateSnapshotUtils {
+
+/**
+ * From a snapshot reference, return its snapshot path. If a {@link 
FlinkStateSnapshot} is
+ * referenced, it will be retrieved from Kubernetes.
+ *
+ * @param kubernetesClient kubernetes client
+ * @param snapshotRef snapshot reference
+ * @return found savepoint path
+ */
+public static String getValidatedFlinkStateSnapshotPath(
+KubernetesClient kubernetesClient, FlinkStateSnapshotReference 
snapshotRef) {
+if (StringUtils.isNotBlank(snapshotRef.getPath())) {
+return snapshotRef.getPath();
+}
+
+if (StringUtils.isBlank(snapshotRef.getName())) {
+throw new IllegalArgumentException(
+String.format("Invalid snapshot name: %s", 
snapshotRef.getName()));
+}
+
+var result =
+snapshotRef.getNamespace() == null
+? kubernetesClient
+.resources(FlinkStateSnapshot.class)
+.withName(snapshotRef.getName())
+.get()
+: kubernetesClient
+.resources(FlinkStateSnapshot.class)
+.inNamespace(snapshotRef.getNamespace())
+.withName(snapshotRef.getName())
+.get();
+
+if (result == null) {
+throw new IllegalStateException(
+String.format(
+

Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]

2024-07-25 Thread via GitHub


anupamaggarwal commented on code in PR #821:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1691872393


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/snapshot/StateSnapshotReconciler.java:
##
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler.snapshot;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
+import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotSpec;
+import 
org.apache.flink.kubernetes.operator.controller.FlinkStateSnapshotContext;
+import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import 
org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.FlinkStateSnapshotUtils;
+import org.apache.flink.kubernetes.operator.utils.SnapshotUtils;
+import org.apache.flink.util.Preconditions;
+
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import lombok.RequiredArgsConstructor;
+import org.apache.commons.lang3.ObjectUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+
+import static 
org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.TRIGGER_PENDING;
+
+/** The reconciler for the {@link 
org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot}. */
+@RequiredArgsConstructor
+public class StateSnapshotReconciler {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(StateSnapshotReconciler.class);
+
+private final FlinkResourceContextFactory ctxFactory;
+private final EventRecorder eventRecorder;
+
+public void reconcile(FlinkStateSnapshotContext ctx) {
+var resource = ctx.getResource();
+
+var savepointState = resource.getStatus().getState();
+if (!TRIGGER_PENDING.equals(savepointState)) {
+return;
+}
+
+if (resource.getSpec().isSavepoint()
+&& resource.getSpec().getSavepoint().getAlreadyExists()) {
+LOG.info(
+"Snapshot {} is marked as completed in spec, skipping 
triggering savepoint.",
+resource.getMetadata().getName());
+
+FlinkStateSnapshotUtils.snapshotSuccessful(
+resource, resource.getSpec().getSavepoint().getPath(), 
true);
+return;
+}
+
+if (FlinkStateSnapshotUtils.abandonSnapshotIfJobNotRunning(
+ctx.getKubernetesClient(),
+ctx.getResource(),
+ctx.getSecondaryResource().orElse(null),
+eventRecorder)) {
+return;
+}
+
+var jobId = 
ctx.getSecondaryResource().orElseThrow().getStatus().getJobStatus().getJobId();
+
+Optional triggerIdOpt;
+try {
+triggerIdOpt = triggerCheckpointOrSavepoint(resource.getSpec(), 
ctx, jobId);
+} catch (Exception e) {
+LOG.error("Failed to trigger snapshot for resource {}", 
ctx.getResource(), e);
+throw new ReconciliationException(e);
+}
+
+if (triggerIdOpt.isEmpty()) {
+LOG.warn("Failed to trigger snapshot {}", 
resource.getMetadata().getName());
+return;
+}
+
+FlinkStateSnapshotUtils.snapshotInProgress(resource, 
triggerIdOpt.get());
+}
+
+public DeleteControl cleanup(FlinkStateSnapshotContext ctx) throws 
Exception {
+var resource = ctx.getResource();
+var state = resource.getStatus().getState();
+var resourceName = resource.getMetadata().getName();
+LOG.info("Cleaning up resource {}...", resourceName);
+
+if (resource.getSpec().isCheckpoint()) {
+return DeleteControl.defaultDelete();
+}
+if (!resource.getSpec().getSavepoint().getDisposeOnDelete()) {
+return 

Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]

2024-07-25 Thread via GitHub


anupamaggarwal commented on code in PR #821:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1691872393


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/snapshot/StateSnapshotReconciler.java:
##
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler.snapshot;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
+import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotSpec;
+import 
org.apache.flink.kubernetes.operator.controller.FlinkStateSnapshotContext;
+import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import 
org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.FlinkStateSnapshotUtils;
+import org.apache.flink.kubernetes.operator.utils.SnapshotUtils;
+import org.apache.flink.util.Preconditions;
+
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import lombok.RequiredArgsConstructor;
+import org.apache.commons.lang3.ObjectUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+
+import static 
org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.TRIGGER_PENDING;
+
+/** The reconciler for the {@link 
org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot}. */
+@RequiredArgsConstructor
+public class StateSnapshotReconciler {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(StateSnapshotReconciler.class);
+
+private final FlinkResourceContextFactory ctxFactory;
+private final EventRecorder eventRecorder;
+
+public void reconcile(FlinkStateSnapshotContext ctx) {
+var resource = ctx.getResource();
+
+var savepointState = resource.getStatus().getState();
+if (!TRIGGER_PENDING.equals(savepointState)) {
+return;
+}
+
+if (resource.getSpec().isSavepoint()
+&& resource.getSpec().getSavepoint().getAlreadyExists()) {
+LOG.info(
+"Snapshot {} is marked as completed in spec, skipping 
triggering savepoint.",
+resource.getMetadata().getName());
+
+FlinkStateSnapshotUtils.snapshotSuccessful(
+resource, resource.getSpec().getSavepoint().getPath(), 
true);
+return;
+}
+
+if (FlinkStateSnapshotUtils.abandonSnapshotIfJobNotRunning(
+ctx.getKubernetesClient(),
+ctx.getResource(),
+ctx.getSecondaryResource().orElse(null),
+eventRecorder)) {
+return;
+}
+
+var jobId = 
ctx.getSecondaryResource().orElseThrow().getStatus().getJobStatus().getJobId();
+
+Optional triggerIdOpt;
+try {
+triggerIdOpt = triggerCheckpointOrSavepoint(resource.getSpec(), 
ctx, jobId);
+} catch (Exception e) {
+LOG.error("Failed to trigger snapshot for resource {}", 
ctx.getResource(), e);
+throw new ReconciliationException(e);
+}
+
+if (triggerIdOpt.isEmpty()) {
+LOG.warn("Failed to trigger snapshot {}", 
resource.getMetadata().getName());
+return;
+}
+
+FlinkStateSnapshotUtils.snapshotInProgress(resource, 
triggerIdOpt.get());
+}
+
+public DeleteControl cleanup(FlinkStateSnapshotContext ctx) throws 
Exception {
+var resource = ctx.getResource();
+var state = resource.getStatus().getState();
+var resourceName = resource.getMetadata().getName();
+LOG.info("Cleaning up resource {}...", resourceName);
+
+if (resource.getSpec().isCheckpoint()) {
+return DeleteControl.defaultDelete();
+}
+if (!resource.getSpec().getSavepoint().getDisposeOnDelete()) {
+return 

Re: [PR] [FLINK-35897][Checkpoint] Cleanup completed resource when checkpoint is canceled [flink]

2024-07-25 Thread via GitHub


flinkbot commented on PR #25120:
URL: https://github.com/apache/flink/pull/25120#issuecomment-2250449661

   
   ## CI report:
   
   * 5b0ca74d3951873a800d5b3ef0d23736f6fab755 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-35897][Checkpoint] Cleanup completed resource when checkpoint is canceled [flink]

2024-07-25 Thread via GitHub


ljz2051 opened a new pull request, #25120:
URL: https://github.com/apache/flink/pull/25120

   
   
   ## What is the purpose of the change
   
   This pull request cleanup completed checkpoint resource when the job 
checkpoint is canceled.
   
   
   ## Brief change log
   
   - When the asynchronous snapshot thread completes a checkpoint,  cleanup the 
completed checkpoint if it finds that the checkpoint has been canceled.
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as 
AsyncSnapshotCallableTest.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no )
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35885][table] Ignore advancing window processor by watermark in window agg based on proctime [flink]

2024-07-25 Thread via GitHub


flinkbot commented on PR #25119:
URL: https://github.com/apache/flink/pull/25119#issuecomment-2250260386

   
   ## CI report:
   
   * f529368b2ff98c7a50d5e790c9b9ca879ca200a8 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-35885][table] Ignore advancing window processor by watermark in window agg based on proctime [flink]

2024-07-25 Thread via GitHub


xuyangzhong opened a new pull request, #25119:
URL: https://github.com/apache/flink/pull/25119

   ## What is the purpose of the change
   
   This pr tries to ignore the advancement of the window processor caused by 
watermark in proctime window agg.
   
   ## Brief change log
   
 - *Ignore the advancement of the window processor caused by watermark in 
proctime window agg*
 - *Add harness test for this bugfix*
   
   
   ## Verifying this change
   
   A harness test has been added to verify this pr.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35282][FLINK-35520] PyFlink Support for Apache Beam > 2.49 [flink]

2024-07-25 Thread via GitHub


xaniasd commented on PR #24908:
URL: https://github.com/apache/flink/pull/24908#issuecomment-2250200976

   hi there @snuyanzin @hlteoh37, are you actively working on this PR? Is there 
any way I could help with this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35894] Add Elasticsearch Sink Connector for Flink CDC Pipeline [flink-cdc]

2024-07-25 Thread via GitHub


lvyanquan commented on PR #3495:
URL: https://github.com/apache/flink-cdc/pull/3495#issuecomment-2250180097

   Thanks @proletarians for this contribution, left some comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35894] Add Elasticsearch Sink Connector for Flink CDC Pipeline [flink-cdc]

2024-07-25 Thread via GitHub


lvyanquan commented on code in PR #3495:
URL: https://github.com/apache/flink-cdc/pull/3495#discussion_r1691213209


##
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/pom.xml:
##
@@ -0,0 +1,211 @@
+http://maven.apache.org/POM/4.0.0;
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+
+4.0.0
+
+
+org.apache.flink
+flink-cdc-pipeline-connectors
+3.2-SNAPSHOT

Review Comment:
   please use `${revision}` like other connectors.



##
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchEventSerializer.java:
##
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.elasticsearch.serializer;
+
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.cdc.common.data.RecordData;
+import org.apache.flink.cdc.common.event.*;
+import org.apache.flink.cdc.common.schema.Column;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.*;
+import org.apache.flink.cdc.common.utils.Preconditions;
+import org.apache.flink.cdc.common.utils.SchemaUtils;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+
+import co.elastic.clients.elasticsearch.core.bulk.BulkOperationVariant;
+import co.elastic.clients.elasticsearch.core.bulk.DeleteOperation;
+import co.elastic.clients.elasticsearch.core.bulk.IndexOperation;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** A serializer for Event to BulkOperationVariant. */
+public class ElasticsearchEventSerializer implements ElementConverter {
+private final ObjectMapper objectMapper = new ObjectMapper();
+private final Map schemaMaps = new HashMap<>();
+
+/** Format DATE type data. */
+public static final DateTimeFormatter DATE_FORMATTER =
+DateTimeFormatter.ofPattern("-MM-dd");
+
+/** Format timestamp-related type data. */
+public static final DateTimeFormatter DATE_TIME_FORMATTER =
+DateTimeFormatter.ofPattern("-MM-dd HH:mm:ss.SS");
+
+/** ZoneId from pipeline config to support timestamp with local time zone. 
*/
+private final ZoneId pipelineZoneId;
+
+public ElasticsearchEventSerializer(ZoneId zoneId) {
+this.pipelineZoneId = zoneId;
+}
+
+@Override
+public BulkOperationVariant apply(Event event, SinkWriter.Context context) 
{
+try {
+if (event instanceof DataChangeEvent) {
+return applyDataChangeEvent((DataChangeEvent) event);
+} else if (event instanceof SchemaChangeEvent) {
+IndexOperation> indexOperation =
+applySchemaChangeEvent((SchemaChangeEvent) event);
+if (indexOperation != null) {
+return indexOperation;
+}
+}
+} catch (IOException e) {
+throw new RuntimeException("Failed to serialize event", e);
+}
+return null;
+}
+
+private IndexOperation> applySchemaChangeEvent(
+SchemaChangeEvent schemaChangeEvent) throws IOException {
+TableId tableId = schemaChangeEvent.tableId();
+if (schemaChangeEvent instanceof CreateTableEvent) {
+Schema schema = ((CreateTableEvent) schemaChangeEvent).getSchema();
+schemaMaps.put(tableId, schema);
+return createSchemaIndexOperation(tableId, schema);
+} else if (schemaChangeEvent instanceof AddColumnEvent
+|| schemaChangeEvent instanceof DropColumnEvent) {
+if 

Re: [PR] [FLINK-35868][cdc-connector][mongodb] Bump dependency version to support MongoDB 7.0 [flink-cdc]

2024-07-25 Thread via GitHub


leonardBang merged PR #3489:
URL: https://github.com/apache/flink-cdc/pull/3489


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35391][cdc-connector][paimon] Bump dependency of Paimon Pipeline connector to 0.8.0 [flink-cdc]

2024-07-25 Thread via GitHub


beryllw commented on PR #3335:
URL: https://github.com/apache/flink-cdc/pull/3335#issuecomment-2249967829

   @leonardBang It seems that the failure of unit ci has nothing to do with 
Paimon's changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35016] catalog changes for model resource [flink]

2024-07-25 Thread via GitHub


twalthr commented on code in PR #25036:
URL: https://github.com/apache/flink/pull/25036#discussion_r1690005374


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java:
##
@@ -790,4 +792,126 @@ void alterPartitionColumnStatistics(
 CatalogColumnStatistics columnStatistics,
 boolean ignoreIfNotExists)
 throws PartitionNotExistException, CatalogException;
+
+// -- models  --
+
+/**
+ * Get names of all tables models under this database. An empty list is 
returned if none exists.
+ *
+ * @return a list of the names of all models in this database
+ * @throws DatabaseNotExistException if the database does not exist
+ * @throws CatalogException in case of any runtime exception
+ */
+default List listModels(String databaseName)
+throws DatabaseNotExistException, CatalogException {
+throw new UnsupportedOperationException(
+String.format("listModel(String) is not implemented for %s.", 
this.getClass()));

Review Comment:
   Let's return an empty list instead. If possible we should avoid errors to 
make the system more robust. At least on the read path, not necessarily on the 
write path.



##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java:
##
@@ -790,4 +792,126 @@ void alterPartitionColumnStatistics(
 CatalogColumnStatistics columnStatistics,
 boolean ignoreIfNotExists)
 throws PartitionNotExistException, CatalogException;
+
+// -- models  --
+
+/**
+ * Get names of all tables models under this database. An empty list is 
returned if none exists.

Review Comment:
   ```suggestion
* Get names of all models under this database. An empty list is 
returned if none exists.
   ```



##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogModel.java:
##
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.Schema;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Interface for a model in a catalog. */
+@PublicEvolving
+public interface CatalogModel {
+/** Returns a map of string-based model options. */
+Map getOptions();
+
+/** Returns a list of model changes. */
+List getModelChanges();
+
+/**
+ * Get the unresolved input schema of the model.
+ *
+ * @return unresolved input schema of the model.
+ */
+Schema getInputSchema();
+
+/**
+ * Get the unresolved output schema of the model.
+ *
+ * @return unresolved output schema of the model.
+ */
+Schema getOutputSchema();
+
+/**
+ * Get comment of the model.
+ *
+ * @return comment of the model.
+ */
+String getComment();
+
+/**
+ * Get a deep copy of the CatalogModel instance.
+ *
+ * @return a copy of the CatalogModel instance
+ */
+CatalogModel copy();
+
+/**
+ * Copy the input model options into the CatalogModel instance.
+ *
+ * @return a copy of the CatalogModel instance with new model options.
+ */
+CatalogModel copy(Map options);
+
+/**
+ * Creates a basic implementation of this interface.
+ *
+ * @param inputSchema unresolved input schema
+ * @param outputSchema unresolved output schema
+ * @param modelOptions model options
+ * @param comment optional comment
+ */
+static CatalogModel of(
+Schema inputSchema,
+Schema outputSchema,
+Map modelOptions,
+@Nullable String comment) {
+return new DefaultCatalogModel(
+inputSchema, outputSchema, modelOptions, new ArrayList<>(), 
comment);
+}
+
+/**
+ * Creates a basic implementation of this interface.
+ *
+ * @param inputSchema unresolved input schema
+ * @param outputSchema unresolved output schema
+ * @param modelChanges 

Re: [PR] [FLINK-33874][runtime] Support resource request wait mechanism at DefaultDeclarativeSlotPool side for Default Scheduler [flink]

2024-07-25 Thread via GitHub


RocMarshal commented on code in PR #25113:
URL: https://github.com/apache/flink/pull/25113#discussion_r1691120384


##
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java:
##
@@ -88,7 +88,22 @@ CompletableFuture registerJobMaster(
 CompletableFuture declareRequiredResources(
 JobMasterId jobMasterId,
 ResourceRequirements resourceRequirements,
-@RpcTimeout Time timeout);
+@RpcTimeout Duration timeout);
+
+/**
+ * @deprecated Use {@link #declareRequiredResources(JobMasterId, 
ResourceRequirements,
+ * Duration)}. Declares the absolute resource requirements for a job.
+ * @param jobMasterId id of the JobMaster
+ * @param resourceRequirements resource requirements
+ * @return The confirmation that the requirements have been processed
+ */
+@Deprecated
+default CompletableFuture declareRequiredResources(

Review Comment:
   sounds good~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35847][release] Add release note for version 1.20 [flink]

2024-07-25 Thread via GitHub


superdiaodiao commented on code in PR #25091:
URL: https://github.com/apache/flink/pull/25091#discussion_r1691112012


##
docs/content.zh/release-notes/flink-1.20.md:
##
@@ -0,0 +1,434 @@
+---
+title: "Release Notes - Flink 1.20"
+---
+
+
+# Release notes - Flink 1.20
+
+These release notes discuss important aspects, such as configuration, behavior 
or dependencies,
+that changed between Flink 1.19 and Flink 1.20. Please read these notes 
carefully if you are 
+planning to upgrade your Flink version to 1.20.
+
+### Checkpoints
+
+ Unified File Merging Mechanism for Checkpoints
+
+# [FLINK-32070](https://issues.apache.org/jira/browse/FLINK-32070)
+
+The unified file merging mechanism for checkpointing is introduced to Flink 
1.20 as an MVP ("minimum viable product")
+feature, which allows scattered small checkpoint files to be written into 
larger files, reducing the
+number of file creations and file deletions and alleviating the pressure of 
file system metadata management
+raised by the file flooding problem during checkpoints. The mechanism can be 
enabled by setting
+`state.checkpoints.file-merging.enabled` to `true`. For more advanced options 
and principle behind
+this feature, please refer to the document of `Checkpointing`.
+
+ Reorganize State & Checkpointing & Recovery Configuration
+
+# [FLINK-34255](https://issues.apache.org/jira/browse/FLINK-34255)
+
+Currently, all the options about state and checkpointing are reorganized and 
categorized by
+prefixes as listed below:
+
+1. execution.checkpointing: all configurations associated with checkpointing 
and savepoint.
+2. execution.state-recovery: all configurations pertinent to state recovery.
+3. state.*: all configurations related to the state accessing.
+1. state.backend.*: specific options for individual state backends, such 
as RocksDB.
+2. state.changelog: configurations for the changelog, as outlined in 
FLIP-158, including the options for the "Durable Short-term Log" (DSTL).
+3. state.latency-track: configurations related to the latency tracking of 
state access.
+
+At the meantime, all the original options scattered everywhere are annotated 
as `@Deprecated`.
+
+ Use common thread pools when transferring RocksDB state files
+
+# [FLINK-35501](https://issues.apache.org/jira/browse/FLINK-35501)
+
+The semantics of `state.backend.rocksdb.checkpoint.transfer.thread.num` 
changed slightly:
+If negative, the common (TM) IO thread pool is used (see 
`cluster.io-pool.size`) for up/downloading RocksDB files.
+
+ Expose RocksDB bloom filter metrics
+
+# [FLINK-34386](https://issues.apache.org/jira/browse/FLINK-34386)
+
+We expose some RocksDB bloom filter metrics to monitor the effectiveness of 
bloom filter optimization:
+
+`BLOOM_FILTER_USEFUL`: times bloom filter has avoided file reads.
+`BLOOM_FILTER_FULL_POSITIVE`: times bloom FullFilter has not avoided the reads.
+`BLOOM_FILTER_FULL_TRUE_POSITIVE`: times bloom FullFilter has not avoided the 
reads and data actually exist.
+
+ Manually Compact Small SST Files
+
+# [FLINK-26050](https://issues.apache.org/jira/browse/FLINK-26050)
+
+In some cases, the number of files produced by RocksDB state backend grows 
indefinitely.This might
+cause task state info (TDD and checkpoint ACK) to exceed RPC message size and 
fail recovery/checkpoint
+in addition to having lots of small files.
+
+In Flink 1.20, you can manually merge such files in the background using 
RocksDB API.
+
+### Runtime & Coordination
+
+ Support Job Recovery from JobMaster Failures for Batch Jobs
+
+# [FLINK-33892](https://issues.apache.org/jira/browse/FLINK-33892)
+
+In 1.20, we introduced a batch job recovery mechanism to enable batch jobs to 
recover as much progress as possible 
+after a JobMaster failover, avoiding the need to rerun tasks that have already 
been finished.
+
+More information about this feature and how to enable it could be found in: 
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/batch/recovery_from_job_master_failure/
+
+ Extend Curator config option for Zookeeper configuration
+
+# [FLINK-33376](https://issues.apache.org/jira/browse/FLINK-33376)
+
+Adds support for the following curator parameters: 
+`high-availability.zookeeper.client.authorization` (corresponding curator 
parameter: `authorization`),
+`high-availability.zookeeper.client.max-close-wait` (corresponding curator 
parameter: `maxCloseWaitMs`), 
+`high-availability.zookeeper.client.simulated-session-expiration-percent` 
(corresponding curator parameter: `simulatedSessionExpirationPercent`).
+
+ More fine-grained timer processing
+
+# [FLINK-20217](https://issues.apache.org/jira/browse/FLINK-20217)
+
+Firing timers can now be interrupted to speed up checkpointing. Timers that 
were interrupted by a checkpoint,
+will be fired shortly after checkpoint completes. 
+
+By default, this features is disabled. To enabled it please set 

Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2024-07-25 Thread via GitHub


RocMarshal commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1691113488


##
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategyTest.java:
##
@@ -69,46 +69,29 @@ class LocalInputPreferredSlotSharingStrategyTest {
 private TestingSchedulingExecutionVertex ev21;
 private TestingSchedulingExecutionVertex ev22;
 
-private Set slotSharingGroups;
-
-@BeforeEach
-void setUp() {
-topology = new TestingSchedulingTopology();
-
+private void setupCase() {
 ev11 = topology.newExecutionVertex(JOB_VERTEX_ID_1, 0);
 ev12 = topology.newExecutionVertex(JOB_VERTEX_ID_1, 1);
 
 ev21 = topology.newExecutionVertex(JOB_VERTEX_ID_2, 0);
 ev22 = topology.newExecutionVertex(JOB_VERTEX_ID_2, 1);
 
-final SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
-slotSharingGroup.addVertexToGroup(JOB_VERTEX_ID_1);
-slotSharingGroup.addVertexToGroup(JOB_VERTEX_ID_2);
-slotSharingGroups = Collections.singleton(slotSharingGroup);
+slotsharingGroup.addVertexToGroup(JOB_VERTEX_ID_1);
+slotsharingGroup.addVertexToGroup(JOB_VERTEX_ID_2);
 }
 
-@Test
-void testCoLocationConstraintIsRespected() {
-topology.connect(ev11, ev22);
-topology.connect(ev12, ev21);
-
-final CoLocationGroup coLocationGroup =
-new TestingCoLocationGroup(JOB_VERTEX_ID_1, JOB_VERTEX_ID_2);
-final Set coLocationGroups = 
Collections.singleton(coLocationGroup);
-
-final SlotSharingStrategy strategy =
-new LocalInputPreferredSlotSharingStrategy(
-topology, slotSharingGroups, coLocationGroups);
-
-assertThat(strategy.getExecutionSlotSharingGroups()).hasSize(2);
-
assertThat(strategy.getExecutionSlotSharingGroup(ev11.getId()).getExecutionVertexIds())
-.contains(ev11.getId(), ev21.getId());
-
assertThat(strategy.getExecutionSlotSharingGroup(ev12.getId()).getExecutionVertexIds())
-.contains(ev12.getId(), ev22.getId());
+@Override
+protected SlotSharingStrategy getSlotSharingStrategy(
+SchedulingTopology topology,
+Set slotSharingGroups,
+Set coLocationGroups) {
+return new LocalInputPreferredSlotSharingStrategy(
+topology, slotSharingGroups, coLocationGroups);
 }
 
 @Test
 void testInputLocalityIsRespectedWithRescaleEdge() {
+setupCase();

Review Comment:
   Thank @1996fanrui you very much for the review ,
   I updated related parts based on your comments. PTAL if you had the free 
time. :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35847][release] Add release note for version 1.20 [flink]

2024-07-25 Thread via GitHub


superdiaodiao commented on code in PR #25091:
URL: https://github.com/apache/flink/pull/25091#discussion_r1691112012


##
docs/content.zh/release-notes/flink-1.20.md:
##
@@ -0,0 +1,434 @@
+---
+title: "Release Notes - Flink 1.20"
+---
+
+
+# Release notes - Flink 1.20
+
+These release notes discuss important aspects, such as configuration, behavior 
or dependencies,
+that changed between Flink 1.19 and Flink 1.20. Please read these notes 
carefully if you are 
+planning to upgrade your Flink version to 1.20.
+
+### Checkpoints
+
+ Unified File Merging Mechanism for Checkpoints
+
+# [FLINK-32070](https://issues.apache.org/jira/browse/FLINK-32070)
+
+The unified file merging mechanism for checkpointing is introduced to Flink 
1.20 as an MVP ("minimum viable product")
+feature, which allows scattered small checkpoint files to be written into 
larger files, reducing the
+number of file creations and file deletions and alleviating the pressure of 
file system metadata management
+raised by the file flooding problem during checkpoints. The mechanism can be 
enabled by setting
+`state.checkpoints.file-merging.enabled` to `true`. For more advanced options 
and principle behind
+this feature, please refer to the document of `Checkpointing`.
+
+ Reorganize State & Checkpointing & Recovery Configuration
+
+# [FLINK-34255](https://issues.apache.org/jira/browse/FLINK-34255)
+
+Currently, all the options about state and checkpointing are reorganized and 
categorized by
+prefixes as listed below:
+
+1. execution.checkpointing: all configurations associated with checkpointing 
and savepoint.
+2. execution.state-recovery: all configurations pertinent to state recovery.
+3. state.*: all configurations related to the state accessing.
+1. state.backend.*: specific options for individual state backends, such 
as RocksDB.
+2. state.changelog: configurations for the changelog, as outlined in 
FLIP-158, including the options for the "Durable Short-term Log" (DSTL).
+3. state.latency-track: configurations related to the latency tracking of 
state access.
+
+At the meantime, all the original options scattered everywhere are annotated 
as `@Deprecated`.
+
+ Use common thread pools when transferring RocksDB state files
+
+# [FLINK-35501](https://issues.apache.org/jira/browse/FLINK-35501)
+
+The semantics of `state.backend.rocksdb.checkpoint.transfer.thread.num` 
changed slightly:
+If negative, the common (TM) IO thread pool is used (see 
`cluster.io-pool.size`) for up/downloading RocksDB files.
+
+ Expose RocksDB bloom filter metrics
+
+# [FLINK-34386](https://issues.apache.org/jira/browse/FLINK-34386)
+
+We expose some RocksDB bloom filter metrics to monitor the effectiveness of 
bloom filter optimization:
+
+`BLOOM_FILTER_USEFUL`: times bloom filter has avoided file reads.
+`BLOOM_FILTER_FULL_POSITIVE`: times bloom FullFilter has not avoided the reads.
+`BLOOM_FILTER_FULL_TRUE_POSITIVE`: times bloom FullFilter has not avoided the 
reads and data actually exist.
+
+ Manually Compact Small SST Files
+
+# [FLINK-26050](https://issues.apache.org/jira/browse/FLINK-26050)
+
+In some cases, the number of files produced by RocksDB state backend grows 
indefinitely.This might
+cause task state info (TDD and checkpoint ACK) to exceed RPC message size and 
fail recovery/checkpoint
+in addition to having lots of small files.
+
+In Flink 1.20, you can manually merge such files in the background using 
RocksDB API.
+
+### Runtime & Coordination
+
+ Support Job Recovery from JobMaster Failures for Batch Jobs
+
+# [FLINK-33892](https://issues.apache.org/jira/browse/FLINK-33892)
+
+In 1.20, we introduced a batch job recovery mechanism to enable batch jobs to 
recover as much progress as possible 
+after a JobMaster failover, avoiding the need to rerun tasks that have already 
been finished.
+
+More information about this feature and how to enable it could be found in: 
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/batch/recovery_from_job_master_failure/
+
+ Extend Curator config option for Zookeeper configuration
+
+# [FLINK-33376](https://issues.apache.org/jira/browse/FLINK-33376)
+
+Adds support for the following curator parameters: 
+`high-availability.zookeeper.client.authorization` (corresponding curator 
parameter: `authorization`),
+`high-availability.zookeeper.client.max-close-wait` (corresponding curator 
parameter: `maxCloseWaitMs`), 
+`high-availability.zookeeper.client.simulated-session-expiration-percent` 
(corresponding curator parameter: `simulatedSessionExpirationPercent`).
+
+ More fine-grained timer processing
+
+# [FLINK-20217](https://issues.apache.org/jira/browse/FLINK-20217)
+
+Firing timers can now be interrupted to speed up checkpointing. Timers that 
were interrupted by a checkpoint,
+will be fired shortly after checkpoint completes. 
+
+By default, this features is disabled. To enabled it please set 

Re: [PR] [FLINK-35864][Table SQL / API] Add CONV function [flink]

2024-07-25 Thread via GitHub


superdiaodiao commented on code in PR #25109:
URL: https://github.com/apache/flink/pull/25109#discussion_r1691055255


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/MathFunctionsITCase.java:
##
@@ -141,4 +178,181 @@ Stream getTestSetSpecs() {
 new BigDecimal("123.45"),
 DataTypes.DECIMAL(6, 2).notNull()));
 }
+
+private Stream convTestCases() {
+return Stream.of(
+TestSetSpec.forFunction(BuiltInFunctionDefinitions.CONV)
+.onFieldsWithData(
+null, null, "100", "", "11abc", "\u000B\f   
4521   \n\r\t")
+.andDataTypes(
+DataTypes.INT(),
+DataTypes.STRING(),
+DataTypes.STRING(),
+DataTypes.STRING(),
+DataTypes.STRING(),
+DataTypes.STRING())
+// null input
+.testResult($("f0").conv(2, 2), "CONV(f0, 2, 2)", 
null, DataTypes.STRING())
+.testResult($("f1").conv(2, 2), "CONV(f1, 2, 2)", 
null, DataTypes.STRING())
+.testResult(
+$("f2").conv($("f0"), 2),
+"CONV(f2, f0, 2)",
+null,
+DataTypes.STRING())
+.testResult(
+$("f2").conv(2, $("f0")),
+"CONV(f2, 2, f0)",
+null,
+DataTypes.STRING())
+// empty string
+.testResult($("f3").conv(2, 4), "CONV(f3, 2, 4)", 
null, DataTypes.STRING())
+// invalid fromBase
+.testResult($("f2").conv(1, 2), "CONV(f2, 1, 2)", 
null, DataTypes.STRING())
+.testResult(
+$("f2").conv(40, 2), "CONV(f2, 40, 2)", null, 
DataTypes.STRING())
+// invalid toBase
+.testResult(
+$("f2").conv(2, -1), "CONV(f2, 2, -1)", null, 
DataTypes.STRING())
+.testResult(
+$("f2").conv(2, -40), "CONV(f2, 2, -40)", 
null, DataTypes.STRING())
+// invalid num format, ignore suffix
+.testResult(
+$("f4").conv(10, 16), "CONV(f4, 10, 16)", "B", 
DataTypes.STRING())
+// num overflow
+.testTableApiRuntimeError(
+lit("").conv(16, 16),
+NumberFormatException.class,
+"The number  overflows.")
+.testSqlRuntimeError(
+"CONV('', 16, 16)",
+NumberFormatException.class,
+"The number  overflows.")
+.testTableApiRuntimeError(
+lit("FFFEE").conv(16, 16),
+NumberFormatException.class,
+"The number FFFEE overflows.")

Review Comment:
   Make sense!
   Thanks~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix][test] Add CDC 3.1.1 release to migration test versions [flink-cdc]

2024-07-25 Thread via GitHub


leonardBang merged PR #3426:
URL: https://github.com/apache/flink-cdc/pull/3426


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33874][runtime] Support resource request wait mechanism at DefaultDeclarativeSlotPool side for Default Scheduler [flink]

2024-07-25 Thread via GitHub


1996fanrui commented on code in PR #25113:
URL: https://github.com/apache/flink/pull/25113#discussion_r1690930831


##
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java:
##
@@ -88,7 +88,22 @@ CompletableFuture registerJobMaster(
 CompletableFuture declareRequiredResources(
 JobMasterId jobMasterId,
 ResourceRequirements resourceRequirements,
-@RpcTimeout Time timeout);
+@RpcTimeout Duration timeout);
+
+/**
+ * @deprecated Use {@link #declareRequiredResources(JobMasterId, 
ResourceRequirements,
+ * Duration)}. Declares the absolute resource requirements for a job.
+ * @param jobMasterId id of the JobMaster
+ * @param resourceRequirements resource requirements
+ * @return The confirmation that the requirements have been processed
+ */
+@Deprecated
+default CompletableFuture declareRequiredResources(

Review Comment:
   We could refactor all callers from Time to Duration. And the old method 
isn't needed anymore. Because it's not public api, we don't need to mark it to  
`@Deprecated`.
   
   Be careful which commit this change belongs to.
   



##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java:
##
@@ -75,21 +76,30 @@ public class DeclarativeSlotPoolService implements 
SlotPoolService {
 @Nullable private String jobManagerAddress;
 
 private State state = State.CREATED;
+protected ComponentMainThreadExecutor componentMainThreadExecutor;

Review Comment:
   ```suggestion
   protected final ComponentMainThreadExecutor componentMainThreadExecutor;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33874][runtime] Support resource request wait mechanism at DefaultDeclarativeSlotPool side for Default Scheduler [flink]

2024-07-25 Thread via GitHub


1996fanrui commented on code in PR #25113:
URL: https://github.com/apache/flink/pull/25113#discussion_r1690930831


##
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java:
##
@@ -88,7 +88,22 @@ CompletableFuture registerJobMaster(
 CompletableFuture declareRequiredResources(
 JobMasterId jobMasterId,
 ResourceRequirements resourceRequirements,
-@RpcTimeout Time timeout);
+@RpcTimeout Duration timeout);
+
+/**
+ * @deprecated Use {@link #declareRequiredResources(JobMasterId, 
ResourceRequirements,
+ * Duration)}. Declares the absolute resource requirements for a job.
+ * @param jobMasterId id of the JobMaster
+ * @param resourceRequirements resource requirements
+ * @return The confirmation that the requirements have been processed
+ */
+@Deprecated
+default CompletableFuture declareRequiredResources(

Review Comment:
   We could refactor all callers from Time to Duration (Only 3 callers). And 
the old method isn't needed anymore. Because it's not public api, we don't need 
to mark it to  `@Deprecated`.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33874][runtime] Support resource request wait mechanism at DefaultDeclarativeSlotPool side for Default Scheduler [flink]

2024-07-25 Thread via GitHub


RocMarshal commented on code in PR #25113:
URL: https://github.com/apache/flink/pull/25113#discussion_r1690890786


##
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPoolTest.java:
##
@@ -154,7 +163,7 @@ private void testRegisterSlots(boolean isBlocked) {
 BlocklistDeclarativeSlotPoolBuilder.builder()
 .setBlockedTaskManagerChecker(
 isBlocked ? 
taskManager.getResourceID()::equals : ignore -> false)
-.build();
+.build(Duration.ZERO, componentMainThreadExecutor);

Review Comment:
   thx for the comment.
   IIUC, the test case is nothing about slot request max interval. Please 
correct me if wrong. thx a lot



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [BP-1.20][FLINK-35095][test] Fix unstable tests in `ExecutionEnvironmentImplTest` [flink]

2024-07-25 Thread via GitHub


reswqa merged PR #25111:
URL: https://github.com/apache/flink/pull/25111


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [BP-1.20][FLINK-35095][test] Fix unstable tests in `ExecutionEnvironmentImplTest` [flink]

2024-07-25 Thread via GitHub


reswqa commented on PR #25111:
URL: https://github.com/apache/flink/pull/25111#issuecomment-2249702180

   Already reviewed in https://github.com/apache/flink/pull/25085.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35864][Table SQL / API] Add CONV function [flink]

2024-07-25 Thread via GitHub


dylanhz commented on code in PR #25109:
URL: https://github.com/apache/flink/pull/25109#discussion_r1691012678


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/MathFunctionsITCase.java:
##
@@ -141,4 +178,181 @@ Stream getTestSetSpecs() {
 new BigDecimal("123.45"),
 DataTypes.DECIMAL(6, 2).notNull()));
 }
+
+private Stream convTestCases() {
+return Stream.of(
+TestSetSpec.forFunction(BuiltInFunctionDefinitions.CONV)
+.onFieldsWithData(
+null, null, "100", "", "11abc", "\u000B\f   
4521   \n\r\t")
+.andDataTypes(
+DataTypes.INT(),
+DataTypes.STRING(),
+DataTypes.STRING(),
+DataTypes.STRING(),
+DataTypes.STRING(),
+DataTypes.STRING())
+// null input
+.testResult($("f0").conv(2, 2), "CONV(f0, 2, 2)", 
null, DataTypes.STRING())
+.testResult($("f1").conv(2, 2), "CONV(f1, 2, 2)", 
null, DataTypes.STRING())
+.testResult(
+$("f2").conv($("f0"), 2),
+"CONV(f2, f0, 2)",
+null,
+DataTypes.STRING())
+.testResult(
+$("f2").conv(2, $("f0")),
+"CONV(f2, 2, f0)",
+null,
+DataTypes.STRING())
+// empty string
+.testResult($("f3").conv(2, 4), "CONV(f3, 2, 4)", 
null, DataTypes.STRING())
+// invalid fromBase
+.testResult($("f2").conv(1, 2), "CONV(f2, 1, 2)", 
null, DataTypes.STRING())
+.testResult(
+$("f2").conv(40, 2), "CONV(f2, 40, 2)", null, 
DataTypes.STRING())
+// invalid toBase
+.testResult(
+$("f2").conv(2, -1), "CONV(f2, 2, -1)", null, 
DataTypes.STRING())
+.testResult(
+$("f2").conv(2, -40), "CONV(f2, 2, -40)", 
null, DataTypes.STRING())
+// invalid num format, ignore suffix
+.testResult(
+$("f4").conv(10, 16), "CONV(f4, 10, 16)", "B", 
DataTypes.STRING())
+// num overflow
+.testTableApiRuntimeError(
+lit("").conv(16, 16),
+NumberFormatException.class,
+"The number  overflows.")
+.testSqlRuntimeError(
+"CONV('', 16, 16)",
+NumberFormatException.class,
+"The number  overflows.")
+.testTableApiRuntimeError(
+lit("FFFEE").conv(16, 16),
+NumberFormatException.class,
+"The number FFFEE overflows.")

Review Comment:
   There are three distinct numbers involved in overflow test cases: 
   - ``, which equates to -1, will not trigger overflow in 
encode(). However, since -1 is inherently treated as an overflow flag, it ends 
up throwing an exception. 
   - `FFFEE` will trigger overflow in encode() line  97.
   - `18446744073709551616` will trigger overflow in encode() line  105.
   
   So these numbers are actually very special boundary cases and I think they 
should be kept.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]

2024-07-25 Thread via GitHub


mateczagany commented on code in PR #821:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1690999244


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtils.java:
##
@@ -0,0 +1,395 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.utils;
+
+import org.apache.flink.autoscaler.utils.DateTimeUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.api.CrdConstants;
+import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
+import org.apache.flink.kubernetes.operator.api.spec.CheckpointSpec;
+import 
org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference;
+import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotSpec;
+import org.apache.flink.kubernetes.operator.api.spec.JobReference;
+import org.apache.flink.kubernetes.operator.api.spec.SavepointSpec;
+import org.apache.flink.kubernetes.operator.api.status.CheckpointType;
+import org.apache.flink.kubernetes.operator.api.status.SavepointFormatType;
+import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.reconciler.SnapshotType;
+
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.apache.commons.lang3.StringUtils;
+
+import javax.annotation.Nullable;
+
+import java.time.Instant;
+import java.util.UUID;
+
+import static 
org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.ABANDONED;
+import static 
org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.COMPLETED;
+import static 
org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.IN_PROGRESS;
+import static 
org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.TRIGGER_PENDING;
+import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.SNAPSHOT_RESOURCE_ENABLED;
+import static 
org.apache.flink.kubernetes.operator.reconciler.SnapshotType.CHECKPOINT;
+import static 
org.apache.flink.kubernetes.operator.reconciler.SnapshotType.SAVEPOINT;
+
+/** Utilities class for FlinkStateSnapshot resources. */
+public class FlinkStateSnapshotUtils {
+
+/**
+ * From a snapshot reference, return its snapshot path. If a {@link 
FlinkStateSnapshot} is
+ * referenced, it will be retrieved from Kubernetes.
+ *
+ * @param kubernetesClient kubernetes client
+ * @param snapshotRef snapshot reference
+ * @return found savepoint path
+ */
+public static String getAndValidateFlinkStateSnapshotPath(
+KubernetesClient kubernetesClient, FlinkStateSnapshotReference 
snapshotRef) {
+if (!StringUtils.isBlank(snapshotRef.getPath())) {
+return snapshotRef.getPath();
+}
+
+if (StringUtils.isBlank(snapshotRef.getName())) {
+throw new IllegalArgumentException(
+String.format("Invalid snapshot name: %s", 
snapshotRef.getName()));
+}
+
+FlinkStateSnapshot result;
+if (snapshotRef.getName() != null) {
+var namespace = snapshotRef.getNamespace();
+if (namespace == null) {
+result =
+kubernetesClient
+.resources(FlinkStateSnapshot.class)
+.withName(snapshotRef.getName())
+.get();
+} else {
+result =
+kubernetesClient
+.resources(FlinkStateSnapshot.class)
+.inNamespace(namespace)
+.withName(snapshotRef.getName())
+

Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]

2024-07-25 Thread via GitHub


mateczagany commented on code in PR #821:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1690995181


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java:
##
@@ -72,21 +74,26 @@ public class FlinkConfigManager {
 
 private volatile Configuration defaultConfig;
 private volatile FlinkOperatorConfiguration defaultOperatorConfiguration;
+private final boolean snapshotCrdInstalled;

Review Comment:
   Added the note, thank you. I am going to also note here so others can see: 
   
   This field will be determined in `FlinkOperator` when the operator starts, 
by checking if `FlinkStateSnapshot` CRD is installed on the Kubernetes cluster. 
   When generating the default configuration for the operator, 
`kubernetes.operator.snapshot.resource.enabled` will be set to false in this 
class if `snapshotCrdInstalled` is false, to allow users to continue using the 
operator if they upgraded from a previous version but did not install the new 
`FlinkStateSnapshot` CRD.
   
   This check and field is planned to be removed in the release after this FLIP 
gets released (so hopefully 1.11 if this PR is released in 1.10).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35864][Table SQL / API] Add CONV function [flink]

2024-07-25 Thread via GitHub


superdiaodiao commented on code in PR #25109:
URL: https://github.com/apache/flink/pull/25109#discussion_r1690918688


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/MathFunctionsITCase.java:
##
@@ -141,4 +178,181 @@ Stream getTestSetSpecs() {
 new BigDecimal("123.45"),
 DataTypes.DECIMAL(6, 2).notNull()));
 }
+
+private Stream convTestCases() {
+return Stream.of(
+TestSetSpec.forFunction(BuiltInFunctionDefinitions.CONV)
+.onFieldsWithData(
+null, null, "100", "", "11abc", "\u000B\f   
4521   \n\r\t")
+.andDataTypes(
+DataTypes.INT(),
+DataTypes.STRING(),
+DataTypes.STRING(),
+DataTypes.STRING(),
+DataTypes.STRING(),
+DataTypes.STRING())
+// null input
+.testResult($("f0").conv(2, 2), "CONV(f0, 2, 2)", 
null, DataTypes.STRING())
+.testResult($("f1").conv(2, 2), "CONV(f1, 2, 2)", 
null, DataTypes.STRING())
+.testResult(
+$("f2").conv($("f0"), 2),
+"CONV(f2, f0, 2)",
+null,
+DataTypes.STRING())
+.testResult(
+$("f2").conv(2, $("f0")),
+"CONV(f2, 2, f0)",
+null,
+DataTypes.STRING())
+// empty string
+.testResult($("f3").conv(2, 4), "CONV(f3, 2, 4)", 
null, DataTypes.STRING())
+// invalid fromBase
+.testResult($("f2").conv(1, 2), "CONV(f2, 1, 2)", 
null, DataTypes.STRING())
+.testResult(
+$("f2").conv(40, 2), "CONV(f2, 40, 2)", null, 
DataTypes.STRING())
+// invalid toBase
+.testResult(
+$("f2").conv(2, -1), "CONV(f2, 2, -1)", null, 
DataTypes.STRING())
+.testResult(
+$("f2").conv(2, -40), "CONV(f2, 2, -40)", 
null, DataTypes.STRING())
+// invalid num format, ignore suffix
+.testResult(
+$("f4").conv(10, 16), "CONV(f4, 10, 16)", "B", 
DataTypes.STRING())
+// num overflow
+.testTableApiRuntimeError(
+lit("").conv(16, 16),
+NumberFormatException.class,
+"The number  overflows.")
+.testSqlRuntimeError(
+"CONV('', 16, 16)",
+NumberFormatException.class,
+"The number  overflows.")
+.testTableApiRuntimeError(
+lit("FFFEE").conv(16, 16),
+NumberFormatException.class,
+"The number FFFEE overflows.")

Review Comment:
   @snuyanzin These two test cases(` `vs `FFFEE`) 
are almost the same, do you think we should only keep one of them? Like the 
comment from another PR: 
https://github.com/apache/flink/pull/24773#discussion_r1603091969
   
   cc @lsyldliu 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35864][Table SQL / API] Add CONV function [flink]

2024-07-25 Thread via GitHub


superdiaodiao commented on code in PR #25109:
URL: https://github.com/apache/flink/pull/25109#discussion_r1690918688


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/MathFunctionsITCase.java:
##
@@ -141,4 +178,181 @@ Stream getTestSetSpecs() {
 new BigDecimal("123.45"),
 DataTypes.DECIMAL(6, 2).notNull()));
 }
+
+private Stream convTestCases() {
+return Stream.of(
+TestSetSpec.forFunction(BuiltInFunctionDefinitions.CONV)
+.onFieldsWithData(
+null, null, "100", "", "11abc", "\u000B\f   
4521   \n\r\t")
+.andDataTypes(
+DataTypes.INT(),
+DataTypes.STRING(),
+DataTypes.STRING(),
+DataTypes.STRING(),
+DataTypes.STRING(),
+DataTypes.STRING())
+// null input
+.testResult($("f0").conv(2, 2), "CONV(f0, 2, 2)", 
null, DataTypes.STRING())
+.testResult($("f1").conv(2, 2), "CONV(f1, 2, 2)", 
null, DataTypes.STRING())
+.testResult(
+$("f2").conv($("f0"), 2),
+"CONV(f2, f0, 2)",
+null,
+DataTypes.STRING())
+.testResult(
+$("f2").conv(2, $("f0")),
+"CONV(f2, 2, f0)",
+null,
+DataTypes.STRING())
+// empty string
+.testResult($("f3").conv(2, 4), "CONV(f3, 2, 4)", 
null, DataTypes.STRING())
+// invalid fromBase
+.testResult($("f2").conv(1, 2), "CONV(f2, 1, 2)", 
null, DataTypes.STRING())
+.testResult(
+$("f2").conv(40, 2), "CONV(f2, 40, 2)", null, 
DataTypes.STRING())
+// invalid toBase
+.testResult(
+$("f2").conv(2, -1), "CONV(f2, 2, -1)", null, 
DataTypes.STRING())
+.testResult(
+$("f2").conv(2, -40), "CONV(f2, 2, -40)", 
null, DataTypes.STRING())
+// invalid num format, ignore suffix
+.testResult(
+$("f4").conv(10, 16), "CONV(f4, 10, 16)", "B", 
DataTypes.STRING())
+// num overflow
+.testTableApiRuntimeError(
+lit("").conv(16, 16),
+NumberFormatException.class,
+"The number  overflows.")
+.testSqlRuntimeError(
+"CONV('', 16, 16)",
+NumberFormatException.class,
+"The number  overflows.")
+.testTableApiRuntimeError(
+lit("FFFEE").conv(16, 16),
+NumberFormatException.class,
+"The number FFFEE overflows.")

Review Comment:
   @snuyanzin These two test cases(` `vs `FFFEE`) 
are almost the same, may I ask should we only keep one of them? Like the 
comment from another PR: 
https://github.com/apache/flink/pull/24773#discussion_r1603091969
   
   cc @lsyldliu 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35864][Table SQL / API] Add CONV function [flink]

2024-07-25 Thread via GitHub


superdiaodiao commented on code in PR #25109:
URL: https://github.com/apache/flink/pull/25109#discussion_r1690918688


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/MathFunctionsITCase.java:
##
@@ -141,4 +178,181 @@ Stream getTestSetSpecs() {
 new BigDecimal("123.45"),
 DataTypes.DECIMAL(6, 2).notNull()));
 }
+
+private Stream convTestCases() {
+return Stream.of(
+TestSetSpec.forFunction(BuiltInFunctionDefinitions.CONV)
+.onFieldsWithData(
+null, null, "100", "", "11abc", "\u000B\f   
4521   \n\r\t")
+.andDataTypes(
+DataTypes.INT(),
+DataTypes.STRING(),
+DataTypes.STRING(),
+DataTypes.STRING(),
+DataTypes.STRING(),
+DataTypes.STRING())
+// null input
+.testResult($("f0").conv(2, 2), "CONV(f0, 2, 2)", 
null, DataTypes.STRING())
+.testResult($("f1").conv(2, 2), "CONV(f1, 2, 2)", 
null, DataTypes.STRING())
+.testResult(
+$("f2").conv($("f0"), 2),
+"CONV(f2, f0, 2)",
+null,
+DataTypes.STRING())
+.testResult(
+$("f2").conv(2, $("f0")),
+"CONV(f2, 2, f0)",
+null,
+DataTypes.STRING())
+// empty string
+.testResult($("f3").conv(2, 4), "CONV(f3, 2, 4)", 
null, DataTypes.STRING())
+// invalid fromBase
+.testResult($("f2").conv(1, 2), "CONV(f2, 1, 2)", 
null, DataTypes.STRING())
+.testResult(
+$("f2").conv(40, 2), "CONV(f2, 40, 2)", null, 
DataTypes.STRING())
+// invalid toBase
+.testResult(
+$("f2").conv(2, -1), "CONV(f2, 2, -1)", null, 
DataTypes.STRING())
+.testResult(
+$("f2").conv(2, -40), "CONV(f2, 2, -40)", 
null, DataTypes.STRING())
+// invalid num format, ignore suffix
+.testResult(
+$("f4").conv(10, 16), "CONV(f4, 10, 16)", "B", 
DataTypes.STRING())
+// num overflow
+.testTableApiRuntimeError(
+lit("").conv(16, 16),
+NumberFormatException.class,
+"The number  overflows.")
+.testSqlRuntimeError(
+"CONV('', 16, 16)",
+NumberFormatException.class,
+"The number  overflows.")
+.testTableApiRuntimeError(
+lit("FFFEE").conv(16, 16),
+NumberFormatException.class,
+"The number FFFEE overflows.")

Review Comment:
   @snuyanzin These two test cases(` `vs `FFFEE`) 
are almost the same, do you think we should only keep one of them? Like the 
comment from another PR:
   https://github.com/apache/flink/pull/24773#discussion_r1603091969
   cc @lsyldliu 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33874][runtime] Support resource request wait mechanism at DefaultDeclarativeSlotPool side for Default Scheduler [flink]

2024-07-25 Thread via GitHub


RocMarshal commented on code in PR #25113:
URL: https://github.com/apache/flink/pull/25113#discussion_r1689451505


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java:
##
@@ -81,18 +84,26 @@ public DeclarativeSlotPoolService(
 DeclarativeSlotPoolFactory declarativeSlotPoolFactory,
 Clock clock,
 Time idleSlotTimeout,
-Time rpcTimeout) {
+Time rpcTimeout,
+Duration slotRequestMaxInterval,
+@Nonnull ComponentMainThreadExecutor componentMainThreadExecutor) {
 this.jobId = jobId;
 this.clock = clock;
 this.rpcTimeout = rpcTimeout;
 this.registeredTaskManagers = new HashSet<>();
 
 this.declarativeSlotPool =
 declarativeSlotPoolFactory.create(
-jobId, this::declareResourceRequirements, 
idleSlotTimeout, rpcTimeout);
+jobId,
+this::declareResourceRequirements,
+idleSlotTimeout,
+rpcTimeout,
+slotRequestMaxInterval,
+componentMainThreadExecutor);
 }
 
-protected DeclarativeSlotPool getDeclarativeSlotPool() {
+@VisibleForTesting
+public DeclarativeSlotPool getDeclarativeSlotPool() {

Review Comment:
   removed



##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java:
##
@@ -81,18 +84,26 @@ public DeclarativeSlotPoolService(
 DeclarativeSlotPoolFactory declarativeSlotPoolFactory,
 Clock clock,
 Time idleSlotTimeout,
-Time rpcTimeout) {
+Time rpcTimeout,
+Duration slotRequestMaxInterval,
+@Nonnull ComponentMainThreadExecutor componentMainThreadExecutor) {

Review Comment:
   updated



##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java:
##
@@ -599,4 +632,27 @@ private ResourceCounter getFulfilledRequirements(
 ResourceCounter getFulfilledResourceRequirements() {
 return fulfilledResourceRequirements;
 }
+
+@VisibleForTesting
+@Nonnull
+public ComponentMainThreadExecutor getComponentMainThreadExecutor() {
+return componentMainThreadExecutor;
+}
+
+@VisibleForTesting
+@Nonnull
+public Duration getSlotRequestMaxInterval() {
+return slotRequestMaxInterval;
+}

Review Comment:
   deleted.



##
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPoolTest.java:
##
@@ -299,14 +310,22 @@ public BlocklistDeclarativeSlotPoolBuilder 
setBlockedTaskManagerChecker(
 return this;
 }
 
-public BlocklistDeclarativeSlotPool build() {
+public BlocklistDeclarativeSlotPool build(
+Duration slotRequestMaxInterval,
+ComponentMainThreadExecutor componentMainThreadExecutor) {
 return new BlocklistDeclarativeSlotPool(
 new JobID(),
 new DefaultAllocatedSlotPool(),
 ignored -> {},
 blockedTaskManagerChecker,
 Time.seconds(20),
-Time.seconds(20));
+Time.seconds(20),
+slotRequestMaxInterval,
+componentMainThreadExecutor);
+}
+
+public BlocklistDeclarativeSlotPool build() {
+return build(Duration.ZERO, forMainThread());

Review Comment:
   updated.



##
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTestBase.java:
##
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import 

Re: [PR] [FLINK-35888][cdc-connector][paimon] Add e2e test for PaimonDataSink. [flink-cdc]

2024-07-25 Thread via GitHub


lvyanquan commented on code in PR #3491:
URL: https://github.com/apache/flink-cdc/pull/3491#discussion_r1689470540


##
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToPaimonE2eITCase.java:
##
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.pipeline.tests;
+
+import org.apache.flink.cdc.common.test.utils.TestUtils;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
+import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
+import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Stream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/** End-to-end tests for mysql cdc to Paimon pipeline job. */
+@RunWith(Parameterized.class)
+public class MySqlToPaimonE2eITCase extends PipelineTestEnvironment {
+private static final Logger LOG = 
LoggerFactory.getLogger(MySqlToPaimonE2eITCase.class);
+
+public static final int TESTCASE_TIMEOUT_SECONDS = 60;
+
+private TableEnvironment tEnv;
+
+// 
--
+// MySQL Variables (we always use MySQL as the data source for easier 
verifying)
+// 
--
+protected static final String MYSQL_TEST_USER = "mysqluser";
+protected static final String MYSQL_TEST_PASSWORD = "mysqlpw";
+
+@ClassRule
+public static final MySqlContainer MYSQL =
+(MySqlContainer)
+new MySqlContainer(
+MySqlVersion.V8_0) // v8 support both ARM 
and AMD architectures
+.withConfigurationOverride("docker/mysql/my.cnf")
+.withSetupSQL("docker/mysql/setup.sql")
+.withDatabaseName("flink-test")
+.withUsername("flinkuser")
+.withPassword("flinkpw")
+.withNetwork(NETWORK)
+.withNetworkAliases("mysql")
+.withLogConsumer(new Slf4jLogConsumer(LOG));
+
+protected final UniqueDatabase mysqlInventoryDatabase =
+new UniqueDatabase(MYSQL, "mysql_inventory", MYSQL_TEST_USER, 
MYSQL_TEST_PASSWORD);
+
+public MySqlToPaimonE2eITCase() throws IOException {}
+
+@BeforeClass
+public static void initializeContainers() {
+LOG.info("Starting containers...");
+Startables.deepStart(Stream.of(MYSQL)).join();
+LOG.info("Containers are started.");
+}
+
+@Before
+public void before() throws Exception {
+super.before();
+mysqlInventoryDatabase.createAndInitialize();
+tEnv = 
TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
+}
+
+@After
+public void after() {
+super.after();
+mysqlInventoryDatabase.dropDatabase();
+}
+
+@Test
+public void testSyncWholeDatabase() throws Exception {
+  

Re: [PR] [FLINK-35167][cdc-connector] Introduce MaxCompute pipeline DataSink [flink-cdc]

2024-07-24 Thread via GitHub


dingxin-tech commented on PR #3254:
URL: https://github.com/apache/flink-cdc/pull/3254#issuecomment-2249497797

   > Thanks @dingxin-tech for the nice work, the CI wasn't triggered properly, 
we need adjust the CI setting[1] when new connector or new module joining.
   > 
   > 
[1]https://github.com/apache/flink-cdc/blob/master/.github/workflows/flink_cdc.yml
   
   Hi, I added tests for MaxCompute in the CI file. Additionally, I refactored 
the code to apply the newly released DataSink feature, which allows specifying 
a HashFunction, and upgraded the ODPS SDK. Could you please review this pr 
again?
   @leonardBang @lvyanquan 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33874][runtime] Support resource request wait mechanism at DefaultDeclarativeSlotPool side for Default Scheduler [flink]

2024-07-24 Thread via GitHub


1996fanrui commented on code in PR #25113:
URL: https://github.com/apache/flink/pull/25113#discussion_r1690806756


##
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AbstractDeclarativeSlotPoolBridgeTest.java:
##
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+import org.apache.flink.util.clock.SystemClock;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+
+import static 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter.forMainThread;
+
+/** Test base class for {@link DeclarativeSlotPoolBridge}. */
+abstract class AbstractDeclarativeSlotPoolBridgeTest {
+
+protected static final Duration rpcTimeout = Duration.ofSeconds(20);
+protected static final JobID jobId = new JobID();
+protected static final JobMasterId jobMasterId = JobMasterId.generate();

Review Comment:
   CI is failed, these name should be upper case.
   
   
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=61067=logs=52b61abe-a3cc-5bde-cc35-1bbe89bb7df5=54421a62-0c80-5aad-3319-094ff69180bb=6089



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33874][runtime] Support resource request wait mechanism at DefaultDeclarativeSlotPool side for Default Scheduler [flink]

2024-07-24 Thread via GitHub


1996fanrui commented on code in PR #25113:
URL: https://github.com/apache/flink/pull/25113#discussion_r1690742563


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java:
##
@@ -292,7 +299,7 @@ public void connectToResourceManager(ResourceManagerGateway 
resourceManagerGatew
 resourceRequirementServiceConnectionManager.connect(
 resourceRequirements ->
 resourceManagerGateway.declareRequiredResources(
-jobMasterId, resourceRequirements, 
rpcTimeout));
+jobMasterId, resourceRequirements, 
Time.fromDuration(rpcTimeout)));

Review Comment:
   It's better to update the parameter type from `Time` to `Duration` for 
`declareRequiredResources`.
   
   If so, we could avoid unnecessary `type conversion`



##
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeBuilder.java:
##
@@ -80,22 +88,25 @@ public DeclarativeSlotPoolBridgeBuilder 
setRequestSlotMatchingStrategy(
 return this;
 }
 
-public DeclarativeSlotPoolBridge build() {
+public DeclarativeSlotPoolBridge build(
+ComponentMainThreadExecutor componentMainThreadExecutor) {

Review Comment:
   In general, build method doesn't with any parameter, and all parameters are 
set by setter.



##
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPoolTest.java:
##
@@ -299,14 +308,18 @@ public BlocklistDeclarativeSlotPoolBuilder 
setBlockedTaskManagerChecker(
 return this;
 }
 
-public BlocklistDeclarativeSlotPool build() {
+public BlocklistDeclarativeSlotPool build(
+Duration slotRequestMaxInterval,
+ComponentMainThreadExecutor componentMainThreadExecutor) {

Review Comment:
   In general, the build method without any parameter, and all parameters are 
set by setter.



##
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AbstractDeclarativeSlotPoolBridgeTest.java:
##
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+import org.apache.flink.util.clock.SystemClock;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+
+import static 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter.forMainThread;
+
+/** Test base class for {@link DeclarativeSlotPoolBridge}. */
+abstract class AbstractDeclarativeSlotPoolBridgeTest {
+
+protected static final Duration rpcTimeout = Duration.ofSeconds(20);
+protected static final JobID jobId = new JobID();
+protected static final JobMasterId jobMasterId = JobMasterId.generate();
+protected static final ComponentMainThreadExecutor mainThreadExecutor = 
forMainThread();

Review Comment:
   The `mainThreadExecutor` shouldn't be static, right?
   
   It will use the current thread as the executor.



##
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java:
##
@@ -58,45 +61,66 @@
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for the {@link DefaultDeclarativeSlotPool}. */
-class DefaultDeclarativeSlotPoolTest {
+@ExtendWith(ParameterizedTestExtension.class)
+class 

Re: [PR] [FLINK-35822] Introduce DESCRIBE FUNCTION [flink]

2024-07-24 Thread via GitHub


pgaref commented on code in PR #25115:
URL: https://github.com/apache/flink/pull/25115#discussion_r1690799595


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DescribeFunctionOperation.java:
##
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.ContextResolvedFunction;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.DataType;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static 
org.apache.flink.table.api.internal.TableResultUtils.buildTableResult;
+import static 
org.apache.flink.table.types.inference.TypeInferenceUtil.generateSignature;
+
+/**
+ * Operation to describe a DESCRIBE [EXTENDED] [[catalogName.] 
dataBasesName].sqlIdentifier
+ * statement.
+ */
+@Internal
+public class DescribeFunctionOperation implements Operation, 
ExecutableOperation {
+
+private final ObjectIdentifier sqlIdentifier;
+private final boolean isExtended;
+
+public DescribeFunctionOperation(ObjectIdentifier sqlIdentifier, boolean 
isExtended) {
+this.sqlIdentifier = sqlIdentifier;
+this.isExtended = isExtended;
+}
+
+public ObjectIdentifier getSqlIdentifier() {
+return sqlIdentifier;
+}
+
+public boolean isExtended() {
+return isExtended;
+}
+
+@Override
+public String asSummaryString() {
+Map params = new LinkedHashMap<>();
+params.put("identifier", sqlIdentifier);
+params.put("isExtended", isExtended);
+return OperationUtils.formatWithChildren(
+"DESCRIBE FUNCTION", params, Collections.emptyList(), 
Operation::asSummaryString);
+}
+
+@Override
+public TableResultInternal execute(Context ctx) {
+// DESCRIBE FUNCTION  shows all the function properties.
+Optional functionOpt =
+
ctx.getFunctionCatalog().lookupFunction(UnresolvedIdentifier.of(sqlIdentifier));
+if (!functionOpt.isPresent()) {
+throw new ValidationException(
+String.format(
+"Function with the identifier '%s' doesn't exist.",
+sqlIdentifier.asSummaryString()));
+}
+ContextResolvedFunction function = functionOpt.get();
+CatalogFunction catalogFunction = function.getCatalogFunction();
+
+if (catalogFunction == null && !isExtended) {
+throw new ValidationException(
+String.format(
+"Function with the identifier '%s' is a system 
function which can only be described by using the extended keyword. Use 
'DESCRIBE FUNCTION EXTENDED %s' to see its properties.",
+sqlIdentifier.asSummaryString(), 
sqlIdentifier.asSummaryString()));
+}
+
+List> rows = new ArrayList<>();
+if (catalogFunction != null) {
+rows.add(Arrays.asList("class name", 
catalogFunction.getClassName()));
+rows.add(
+Arrays.asList(
+"function language", 
catalogFunction.getFunctionLanguage().toString()));
+rows.add(
+Arrays.asList(
+"resource uris", 
catalogFunction.getFunctionResources().toString()));
+rows.add(Arrays.asList("temporary", function.isTemporary()));
+}
+
+if (isExtended) {
+FunctionDefinition definition = function.getDefinition();
+rows.add(Arrays.asList("kind", definition.getKind().toString()));
+

Re: [PR] [FLINK-35822] Introduce DESCRIBE FUNCTION [flink]

2024-07-24 Thread via GitHub


pgaref commented on code in PR #25115:
URL: https://github.com/apache/flink/pull/25115#discussion_r1690790290


##
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlRichDescribeFunction.java:
##
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.dql;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * DESCRIBE FUNCTION [EXTENDED] [[catalogName.] dataBasesName].sqlIdentifier 
sql call. Here we add
+ * Rich in className to follow the convention of {@link 
org.apache.calcite.sql.SqlDescribeTable},
+ * which only had it to distinguish from calcite's original SqlDescribeTable, 
even though calcite
+ * does not have SqlDescribeFunction.
+ */
+public class SqlRichDescribeFunction extends SqlCall {
+
+public static final SqlSpecialOperator OPERATOR =
+new SqlSpecialOperator("DESCRIBE FUNCTION", SqlKind.OTHER);
+protected final SqlIdentifier functionNameIdentifier;
+private boolean isExtended;
+
+public SqlRichDescribeFunction(
+SqlParserPos pos, SqlIdentifier functionNameIdentifier, boolean 
isExtended) {
+super(pos);
+this.functionNameIdentifier = functionNameIdentifier;
+this.isExtended = isExtended;
+}
+
+@Override
+public SqlOperator getOperator() {
+return OPERATOR;
+}
+
+@Override
+public List getOperandList() {
+return Collections.singletonList(functionNameIdentifier);
+}
+
+public boolean isExtended() {
+return isExtended;
+}
+
+public String[] fullFunctionName() {
+return functionNameIdentifier.names.toArray(new String[0]);

Review Comment:
   ```suggestion
   return functionNameIdentifier.names.toArray(new 
String[functionNameIdentifier.names.size()]);
   ```
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35822] Introduce DESCRIBE FUNCTION [flink]

2024-07-24 Thread via GitHub


pgaref commented on code in PR #25115:
URL: https://github.com/apache/flink/pull/25115#discussion_r1690790290


##
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlRichDescribeFunction.java:
##
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.dql;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * DESCRIBE FUNCTION [EXTENDED] [[catalogName.] dataBasesName].sqlIdentifier 
sql call. Here we add
+ * Rich in className to follow the convention of {@link 
org.apache.calcite.sql.SqlDescribeTable},
+ * which only had it to distinguish from calcite's original SqlDescribeTable, 
even though calcite
+ * does not have SqlDescribeFunction.
+ */
+public class SqlRichDescribeFunction extends SqlCall {
+
+public static final SqlSpecialOperator OPERATOR =
+new SqlSpecialOperator("DESCRIBE FUNCTION", SqlKind.OTHER);
+protected final SqlIdentifier functionNameIdentifier;
+private boolean isExtended;
+
+public SqlRichDescribeFunction(
+SqlParserPos pos, SqlIdentifier functionNameIdentifier, boolean 
isExtended) {
+super(pos);
+this.functionNameIdentifier = functionNameIdentifier;
+this.isExtended = isExtended;
+}
+
+@Override
+public SqlOperator getOperator() {
+return OPERATOR;
+}
+
+@Override
+public List getOperandList() {
+return Collections.singletonList(functionNameIdentifier);
+}
+
+public boolean isExtended() {
+return isExtended;
+}
+
+public String[] fullFunctionName() {
+return functionNameIdentifier.names.toArray(new String[0]);

Review Comment:
   ```suggestion
   return functionNameIdentifier.names.toArray(new 
String[functionNameIdentifier.names.size]);
   ```
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35822] Introduce DESCRIBE FUNCTION [flink]

2024-07-24 Thread via GitHub


pgaref commented on code in PR #25115:
URL: https://github.com/apache/flink/pull/25115#discussion_r1690785755


##
flink-table/flink-sql-client/src/test/resources/sql/function.q:
##
@@ -1,4 +1,4 @@
-# function.q - CREATE/DROP/ALTER FUNCTION
+# function.q - CREATE/DROP/ALTER/SHOW/DESCRIBE FUNCTION

Review Comment:
   ```suggestion
   # function.q - CREATE/DROP/ALTER/DESCRIBE FUNCTION
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35822] Introduce DESCRIBE FUNCTION [flink]

2024-07-24 Thread via GitHub


pgaref commented on code in PR #25115:
URL: https://github.com/apache/flink/pull/25115#discussion_r1690786505


##
flink-table/flink-sql-client/src/test/resources/sql/function.q:
##
@@ -265,7 +265,7 @@ show user functions;
 # test alter function
 # ==
 
-alter function func11 as 
'org.apache.flink.table.client.gateway.local.ExecutorImplITCase$TestScalaFunction';
+alter function func11 as 
'org.apache.flink.table.client.gateway.ExecutorImplITCase$TestScalaFunction';

Review Comment:
   Unrelated change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [cdc-common] add field of defaultValue to Column. [flink-cdc]

2024-07-24 Thread via GitHub


yuxiqian commented on PR #2944:
URL: https://github.com/apache/flink-cdc/pull/2944#issuecomment-2249296709

   Thanks for @lvyanquan's rapid response! Ran migration tests and it works as 
expected.
   
   * `TableChangeInfo#Serializer#magicTableId` should be named in screaming 
case `MAGIC_TABLE_ID` as a `static final` constant, or Spotless will complain.
   
   * `MySqlPipelineITCase` needs update to reflect changes in default value:
   
   ```
   Error:MySqlPipelineITCase.testInitialStartupMode:228 
   expected: CreateTableEvent{tableId=inventory_kohdit.products, 
schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL,`description` 
VARCHAR(512),`weight` FLOAT}, primaryKeys=id, options=()}
but was: CreateTableEvent{tableId=inventory_kohdit.products, 
schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 
'flink',`description` VARCHAR(512),`weight` FLOAT}, primaryKeys=id, options=()}
   ...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [cdc-common] add field of defaultValue to Column. [flink-cdc]

2024-07-24 Thread via GitHub


lvyanquan commented on code in PR #2944:
URL: https://github.com/apache/flink-cdc/pull/2944#discussion_r1690719229


##
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TableChangeInfo.java:
##
@@ -139,6 +144,10 @@ public TableChangeInfo deserialize(int version, byte[] 
serialized) throws IOExce
 try (ByteArrayInputStream bais = new 
ByteArrayInputStream(serialized);
 DataInputStream in = new DataInputStream(bais)) {
 TableId tableId = tableIdSerializer.deserialize(new 
DataInputViewStreamWrapper(in));
+if (tableId.equals(magicTableId)) {
+version = in.readInt();
+tableId = tableIdSerializer.deserialize(new 
DataInputViewStreamWrapper(in));
+}

Review Comment:
   addressed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [BP-1.20][Test] Fix tests about file-merging [flink]

2024-07-24 Thread via GitHub


Zakelly merged PR #25118:
URL: https://github.com/apache/flink/pull/25118


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35858][Runtime/State] Namespace support for async state [flink]

2024-07-24 Thread via GitHub


Zakelly merged PR #25103:
URL: https://github.com/apache/flink/pull/25103


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35858][Runtime/State] Namespace support for async state [flink]

2024-07-24 Thread via GitHub


Zakelly commented on PR #25103:
URL: https://github.com/apache/flink/pull/25103#issuecomment-2249243839

   Thanks for the review, merging...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35835][test] Make file-merging test tolerate the scenario that source does not produce any record [flink]

2024-07-24 Thread via GitHub


Zakelly merged PR #25116:
URL: https://github.com/apache/flink/pull/25116


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [cdc-common] add field of defaultValue to Column. [flink-cdc]

2024-07-24 Thread via GitHub


yuxiqian commented on code in PR #2944:
URL: https://github.com/apache/flink-cdc/pull/2944#discussion_r1690696796


##
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TableChangeInfo.java:
##
@@ -139,6 +144,10 @@ public TableChangeInfo deserialize(int version, byte[] 
serialized) throws IOExce
 try (ByteArrayInputStream bais = new 
ByteArrayInputStream(serialized);
 DataInputStream in = new DataInputStream(bais)) {
 TableId tableId = tableIdSerializer.deserialize(new 
DataInputViewStreamWrapper(in));
+if (tableId.equals(magicTableId)) {
+version = in.readInt();
+tableId = tableIdSerializer.deserialize(new 
DataInputViewStreamWrapper(in));
+}

Review Comment:
   If no `magicTableId` was present, `version` will not be overwritten and 
defaults to be latest version (`TableChangeInfo.SERIALIZER.getVersion()`). Is 
this intended?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [cdc-common] add field of defaultValue to Column. [flink-cdc]

2024-07-24 Thread via GitHub


lvyanquan commented on PR #2944:
URL: https://github.com/apache/flink-cdc/pull/2944#issuecomment-2249216319

   @yuxiqian  I've add magic header in  SERIALIZER of TableChangeInfo to 
distinguish with the state in 3.1.1 and after, can you help to check again?
   And this change could be verified after 
https://github.com/apache/flink-cdc/pull/3426 and this pr, so I tend to add it 
in current pr.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34877][cdc] Flink CDC pipeline transform supports type conversion [flink-cdc]

2024-07-24 Thread via GitHub


yuxiqian commented on code in PR #3357:
URL: https://github.com/apache/flink-cdc/pull/3357#discussion_r1689894811


##
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperatorTest.java:
##
@@ -555,6 +574,64 @@ void testTimestampDiffTransform() throws Exception {
 .isEqualTo(new StreamRecord<>(insertEventExpect));
 }
 
+@Test
+void testNullCastTransform() throws Exception {

Review Comment:
   Great to see this! Could you please insert more non-null data row to test if 
non-null value casting and verify if casting results are correct? Maybe 
`FlinkPipelineComposerITCase` is a good place for this case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28023][docs]Translate "Native Kubernetes" page into Chinese [flink]

2024-07-24 Thread via GitHub


bobobozh closed pull request #25098: [FLINK-28023][docs]Translate "Native 
Kubernetes" page into Chinese
URL: https://github.com/apache/flink/pull/25098


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix][test] Add CDC 3.1.1 release to migration test versions [flink-cdc]

2024-07-24 Thread via GitHub


yuxiqian commented on PR #3426:
URL: https://github.com/apache/flink-cdc/pull/3426#issuecomment-2249185454

   Could this be merged to ensure our next version is compatible with 3.1.1? 
@leonardBang 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35822] Introduce DESCRIBE FUNCTION [flink]

2024-07-24 Thread via GitHub


AlanConfluent commented on code in PR #25115:
URL: https://github.com/apache/flink/pull/25115#discussion_r1690520725


##
flink-table/flink-sql-client/src/test/resources/sql/function.q:
##
@@ -346,3 +346,69 @@ show user functions;
 SHOW JARS;
 Empty set
 !ok
+
+# ==
+# test describe function
+# ==
+
+ADD JAR '$VAR_UDF_JAR_PATH';
+[INFO] Execute statement succeeded.
+!info
+
+describe function temp_upperudf;
++---+-$VAR_UDF_JAR_PATH_DASH+
+| info name | $VAR_UDF_JAR_PATH_SPACE 
info value |

Review Comment:
   What are these variables? `VAR_UDF_JAR_PATH_SPACE`. Seems some of the docs 
have them, and others don't but makes it hard to read the source file.



##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DescribeFunctionOperation.java:
##
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.ContextResolvedFunction;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.DataType;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static 
org.apache.flink.table.api.internal.TableResultUtils.buildTableResult;
+import static 
org.apache.flink.table.types.inference.TypeInferenceUtil.generateSignature;
+
+/**
+ * Operation to describe a DESCRIBE [EXTENDED] [[catalogName.] 
dataBasesName].sqlIdentifier
+ * statement.
+ */
+@Internal
+public class DescribeFunctionOperation implements Operation, 
ExecutableOperation {
+
+private final ObjectIdentifier sqlIdentifier;
+private final boolean isExtended;
+
+public DescribeFunctionOperation(ObjectIdentifier sqlIdentifier, boolean 
isExtended) {
+this.sqlIdentifier = sqlIdentifier;
+this.isExtended = isExtended;
+}
+
+public ObjectIdentifier getSqlIdentifier() {
+return sqlIdentifier;
+}
+
+public boolean isExtended() {
+return isExtended;
+}
+
+@Override
+public String asSummaryString() {
+Map params = new LinkedHashMap<>();
+params.put("identifier", sqlIdentifier);
+params.put("isExtended", isExtended);
+return OperationUtils.formatWithChildren(
+"DESCRIBE FUNCTION", params, Collections.emptyList(), 
Operation::asSummaryString);
+}
+
+@Override
+public TableResultInternal execute(Context ctx) {
+// DESCRIBE FUNCTION  shows all the function properties.
+Optional functionOpt =
+
ctx.getFunctionCatalog().lookupFunction(UnresolvedIdentifier.of(sqlIdentifier));
+if (!functionOpt.isPresent()) {
+throw new ValidationException(
+String.format(
+"Function with the identifier '%s' doesn't exist.",
+sqlIdentifier.asSummaryString()));
+}
+ContextResolvedFunction function = functionOpt.get();
+CatalogFunction catalogFunction = function.getCatalogFunction();
+
+if (catalogFunction == null && !isExtended) {
+throw new ValidationException(
+String.format(
+"Function with the identifier '%s' is a system 
function which can only be described by using the extended keyword. Use 
'DESCRIBE FUNCTION EXTENDED %s' to see its properties.",
+sqlIdentifier.asSummaryString(), 

Re: [PR] [FLINK-35500][Connectors/DynamoDB] DynamoDb Table API Sink fails to delete elements due to key not found [flink-connector-aws]

2024-07-24 Thread via GitHub


boring-cyborg[bot] commented on PR #152:
URL: 
https://github.com/apache/flink-connector-aws/pull/152#issuecomment-2248823319

   Thanks for opening this pull request! Please check out our contributing 
guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-35500][Connectors/DynamoDB] DynamoDb Table API Sink fails to delete elements due to key not found [flink-connector-aws]

2024-07-24 Thread via GitHub


robg-eb opened a new pull request, #152:
URL: https://github.com/apache/flink-connector-aws/pull/152

   ## Purpose of the change
   
   When `DynamoDbSink` is used with CDC sources, it fails to process DELETE 
records and throws
   ```
   
org.apache.flink.connector.dynamodb.shaded.software.amazon.awssdk.services.dynamodb.model.DynamoDbException:
 The provided key element does not match the schema
   ```
   
   This is due to `DynamoDbSinkWriter` passing the whole DynamoDb Item as key 
instead of the constructed primary key alone.
   
   ## Verifying this change
   This change added tests and can be verified as follows:
   - Added tests to `RowDataToAttributeValueConverterTest.java`:
 - `testDeleteOnlyPrimaryKey` - Ensures that for a `DELETE` request, only 
the (single) PK field is included
 - `testDeleteOnlyPrimaryKeys` - Ensures that for a `DELETE` request with a 
composite PK, both PK fields are included.
 - `testPKIgnoredForInsert` - Ensures that PK is ignored when an `INSERT` 
request is done, and all fields continue to be included as they have been in 
the past.
 - `testPKIgnoredForUpdateAfter` - Ensures that PK is ignored when an 
`UPDATE_AFTER` request is done, and all fields continue to be included as they 
have been in the past.
   
   - Ran manual tests following the steps noted in 
https://issues.apache.org/jira/browse/FLINK-35500 under "Steps To Reproduce".   
Running the SQL statement as described in Step 6 now properly runs a DELETE in 
DynamoDB.
   

   ## Significant changes
   Previously, the `PRIMARY KEY` field had no significance for a DynamoDB Sink 
via Table API.   Now, the `PRIMARY KEY` is required when processing a CDC 
stream that contains `DELETES`.This is not a 'breaking change' because the 
previous behavior for processing a CDC stream containing `DELETES` was already 
a failure (`The provided key element does not match the schema`).  This change 
now provides a clear exception informing users to specify a Primary Key to 
avoid that failure.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30481][FLIP-277] GlueCatalog Implementation [flink-connector-aws]

2024-07-24 Thread via GitHub


foxus commented on code in PR #47:
URL: 
https://github.com/apache/flink-connector-aws/pull/47#discussion_r1690278314


##
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/GlueUtils.java:
##
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.FunctionLanguage;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.glue.GlueCatalogOptions;
+import org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.glue.model.Column;
+import software.amazon.awssdk.services.glue.model.Database;
+import software.amazon.awssdk.services.glue.model.GlueResponse;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.UserDefinedFunction;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
+
+/** Utilities related glue Operation. */
+@Internal
+public class GlueUtils {
+
+private static final Logger LOG = LoggerFactory.getLogger(GlueUtils.class);
+
+/**
+ * Glue supports lowercase naming convention.
+ *
+ * @param name fully qualified name.
+ * @return modified name according to glue convention.
+ */
+public static String getGlueConventionalName(String name) {
+
+return name.toLowerCase(Locale.ROOT);
+}
+
+/**
+ * Extract location from database properties if present and remove 
location from properties.
+ * fallback to create default location if not present
+ *
+ * @param databaseProperties database properties.
+ * @param databaseName fully qualified name for database.
+ * @return location for database.
+ */
+public static String extractDatabaseLocation(
+final Map databaseProperties,
+final String databaseName,
+final String catalogPath) {
+if (databaseProperties.containsKey(GlueCatalogConstants.LOCATION_URI)) 
{
+return 
databaseProperties.remove(GlueCatalogConstants.LOCATION_URI);
+} else {
+LOG.info("No location URI Set. Using Catalog Path as default");
+return catalogPath + GlueCatalogConstants.LOCATION_SEPARATOR + 
databaseName;
+}
+}
+
+/**
+ * Extract location from database properties if present and remove 
location from properties.
+ * fallback to create default location if not present
+ *
+ * @param tableProperties table properties.
+ * @param tablePath fully qualified object for table.
+ * @return location for table.
+ */
+public static String extractTableLocation(
+final Map tableProperties,
+final ObjectPath tablePath,
+final String catalogPath) {
+if (tableProperties.containsKey(GlueCatalogConstants.LOCATION_URI)) {
+return tableProperties.remove(GlueCatalogConstants.LOCATION_URI);
+} else {
+return catalogPath
++ GlueCatalogConstants.LOCATION_SEPARATOR
++ tablePath.getDatabaseName()
++ GlueCatalogConstants.LOCATION_SEPARATOR
++ 

Re: [PR] [FLINK-30481][FLIP-277] GlueCatalog Implementation [flink-connector-aws]

2024-07-24 Thread via GitHub


foxus commented on code in PR #47:
URL: 
https://github.com/apache/flink-connector-aws/pull/47#discussion_r1690147540


##
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/operator/GlueFunctionOperator.java:
##
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue.operator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogFunctionImpl;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants;
+import org.apache.flink.table.catalog.glue.util.GlueUtils;
+import org.apache.flink.table.resource.ResourceUri;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.AlreadyExistsException;
+import 
software.amazon.awssdk.services.glue.model.CreateUserDefinedFunctionRequest;
+import 
software.amazon.awssdk.services.glue.model.CreateUserDefinedFunctionResponse;
+import 
software.amazon.awssdk.services.glue.model.DeleteUserDefinedFunctionRequest;
+import 
software.amazon.awssdk.services.glue.model.DeleteUserDefinedFunctionResponse;
+import software.amazon.awssdk.services.glue.model.EntityNotFoundException;
+import 
software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionRequest;
+import 
software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionResponse;
+import 
software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionsRequest;
+import 
software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionsResponse;
+import software.amazon.awssdk.services.glue.model.GlueException;
+import software.amazon.awssdk.services.glue.model.PrincipalType;
+import 
software.amazon.awssdk.services.glue.model.UpdateUserDefinedFunctionRequest;
+import 
software.amazon.awssdk.services.glue.model.UpdateUserDefinedFunctionResponse;
+import software.amazon.awssdk.services.glue.model.UserDefinedFunction;
+import software.amazon.awssdk.services.glue.model.UserDefinedFunctionInput;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/** Utilities for Glue catalog Function related operations. */
+@Internal
+public class GlueFunctionOperator extends GlueOperator {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(GlueFunctionOperator.class);
+
+public GlueFunctionOperator(String catalogName, GlueClient glueClient, 
String glueCatalogId) {
+super(catalogName, glueClient, glueCatalogId);
+}
+
+/**
+ * Create a function. Function name should be handled in a 
case-insensitive way.
+ *
+ * @param functionPath path of the function
+ * @param function Flink function to be created
+ * @throws CatalogException in case of any runtime exception
+ */
+public void createGlueFunction(ObjectPath functionPath, CatalogFunction 
function)
+throws CatalogException, FunctionAlreadyExistException {
+UserDefinedFunctionInput functionInput = 
createFunctionInput(functionPath, function);
+CreateUserDefinedFunctionRequest.Builder createUDFRequest =
+CreateUserDefinedFunctionRequest.builder()
+.databaseName(functionPath.getDatabaseName())
+.catalogId(getGlueCatalogId())
+.functionInput(functionInput);
+try {
+CreateUserDefinedFunctionResponse response =
+
glueClient.createUserDefinedFunction(createUDFRequest.build());
+GlueUtils.validateGlueResponse(response);
+LOG.info("Created Function: {}", functionPath.getFullName());
+} catch (AlreadyExistsException e) {
+LOG.error(
+String.format(
+"%s.%s already Exists. Function language of type: 

Re: [PR] [FLINK-30481][FLIP-277] GlueCatalog Implementation [flink-connector-aws]

2024-07-24 Thread via GitHub


foxus commented on code in PR #47:
URL: 
https://github.com/apache/flink-connector-aws/pull/47#discussion_r1690285665


##
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/GlueUtils.java:
##
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.FunctionLanguage;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.glue.GlueCatalogOptions;
+import org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.glue.model.Column;
+import software.amazon.awssdk.services.glue.model.Database;
+import software.amazon.awssdk.services.glue.model.GlueResponse;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.UserDefinedFunction;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
+
+/** Utilities related glue Operation. */
+@Internal
+public class GlueUtils {
+
+private static final Logger LOG = LoggerFactory.getLogger(GlueUtils.class);
+
+/**
+ * Glue supports lowercase naming convention.
+ *
+ * @param name fully qualified name.
+ * @return modified name according to glue convention.
+ */
+public static String getGlueConventionalName(String name) {
+
+return name.toLowerCase(Locale.ROOT);
+}
+
+/**
+ * Extract location from database properties if present and remove 
location from properties.
+ * fallback to create default location if not present
+ *
+ * @param databaseProperties database properties.
+ * @param databaseName fully qualified name for database.
+ * @return location for database.
+ */
+public static String extractDatabaseLocation(
+final Map databaseProperties,
+final String databaseName,
+final String catalogPath) {
+if (databaseProperties.containsKey(GlueCatalogConstants.LOCATION_URI)) 
{
+return 
databaseProperties.remove(GlueCatalogConstants.LOCATION_URI);
+} else {
+LOG.info("No location URI Set. Using Catalog Path as default");
+return catalogPath + GlueCatalogConstants.LOCATION_SEPARATOR + 
databaseName;
+}
+}
+
+/**
+ * Extract location from database properties if present and remove 
location from properties.
+ * fallback to create default location if not present
+ *
+ * @param tableProperties table properties.
+ * @param tablePath fully qualified object for table.
+ * @return location for table.
+ */
+public static String extractTableLocation(
+final Map tableProperties,
+final ObjectPath tablePath,
+final String catalogPath) {
+if (tableProperties.containsKey(GlueCatalogConstants.LOCATION_URI)) {
+return tableProperties.remove(GlueCatalogConstants.LOCATION_URI);
+} else {
+return catalogPath
++ GlueCatalogConstants.LOCATION_SEPARATOR
++ tablePath.getDatabaseName()
++ GlueCatalogConstants.LOCATION_SEPARATOR
++ 

Re: [PR] [FLINK-22748] Allow dynamic target topic selection in SQL Kafka sinks [flink-connector-kafka]

2024-07-24 Thread via GitHub


dmariassy commented on code in PR #109:
URL: 
https://github.com/apache/flink-connector-kafka/pull/109#discussion_r1690320572


##
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java:
##
@@ -144,14 +147,34 @@ public void open(
 valueSerialization.open(context);
 }
 
+private String getTargetTopic(RowData element) {
+final String topic = readMetadata(element, 
KafkaDynamicSink.WritableMetadata.TOPIC);
+if (topic == null && topics == null) {
+throw new IllegalArgumentException(
+"The topic of the sink record is not valid. Expected a 
single topic but no topic is set.");
+} else if (topic == null && topics.size() == 1) {
+return topics.get(0);

Review Comment:
   > then for topics.size == 1 the topic metadata column should not be writable
   
   I think that makes a lot of sense!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-22748] Allow dynamic target topic selection in SQL Kafka sinks [flink-connector-kafka]

2024-07-24 Thread via GitHub


klam-shop commented on code in PR #109:
URL: 
https://github.com/apache/flink-connector-kafka/pull/109#discussion_r1690254530


##
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java:
##
@@ -144,14 +147,34 @@ public void open(
 valueSerialization.open(context);
 }
 
+private String getTargetTopic(RowData element) {
+final String topic = readMetadata(element, 
KafkaDynamicSink.WritableMetadata.TOPIC);
+if (topic == null && topics == null) {
+throw new IllegalArgumentException(
+"The topic of the sink record is not valid. Expected a 
single topic but no topic is set.");
+} else if (topic == null && topics.size() == 1) {
+return topics.get(0);

Review Comment:
   Good idea, I thought about it and thought it might make the semantics a bit 
complicated, because if we do this, then for `topics.size == 1` then I think 
the `topic` metadata column should not be writable, but I think it's worth 
saving the processing. Updated 



##
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java:
##
@@ -144,14 +147,34 @@ public void open(
 valueSerialization.open(context);
 }
 
+private String getTargetTopic(RowData element) {
+final String topic = readMetadata(element, 
KafkaDynamicSink.WritableMetadata.TOPIC);
+if (topic == null && topics == null) {
+throw new IllegalArgumentException(
+"The topic of the sink record is not valid. Expected a 
single topic but no topic is set.");
+} else if (topic == null && topics.size() == 1) {
+return topics.get(0);

Review Comment:
   Good idea, I thought about it before and thought it might make the semantics 
a bit complicated, because if we do this, then for `topics.size == 1` then I 
think the `topic` metadata column should not be writable, but I think it's 
worth saving the processing. Updated 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-22748] Allow dynamic target topic selection in SQL Kafka sinks [flink-connector-kafka]

2024-07-24 Thread via GitHub


klam-shop commented on code in PR #109:
URL: 
https://github.com/apache/flink-connector-kafka/pull/109#discussion_r1690197712


##
docs/content/docs/connectors/table/kafka.md:
##
@@ -196,11 +196,11 @@ Connector Options
 
 
   topic
-  required for sink
+  optional
   yes
   (none)
   String
-  Topic name(s) to read data from when the table is used as source. It 
also supports topic list for source by separating topic by semicolon like 
'topic-1;topic-2'. Note, only one of "topic-pattern" and "topic" 
can be specified for sources. When the table is used as sink, the topic name is 
the topic to write data to. Note topic list is not supported for sinks.
+  Topic name(s) to read data from when the table is used as source, or 
allowed topics for writing when the table is used as sink. It also supports 
topic list for source by separating topic by semicolon like 
'topic-1;topic-2'. Note, only one of "topic-pattern" and "topic" 
can be specified for sources. For sinks, the topic or list of topics provided 
are they only valid values (whitelist) that can be written to the `topic` 
metadata column for determining what topic to produce to. If only a single 
topic is provided, this is the default topic to produce to. If not provided, 
any value of `topic` metadata is permitted.

Review Comment:
   Good call, fixed. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add support to pass Datadog API Key as environment variable [flink]

2024-07-24 Thread via GitHub


swetakala commented on PR #19684:
URL: https://github.com/apache/flink/pull/19684#issuecomment-2248547135

   Even I need this change, I have opened ticket to address this 
https://issues.apache.org/jira/browse/FLINK-33994.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [BP-1.20][Test] Fix tests about file-merging [flink]

2024-07-24 Thread via GitHub


flinkbot commented on PR #25118:
URL: https://github.com/apache/flink/pull/25118#issuecomment-2248189331

   
   ## CI report:
   
   * 78c25a829275db5bf0295ce5fefab22c89afb7b0 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [BP-1.20][Test] Fix tests about file-merging [flink]

2024-07-24 Thread via GitHub


Zakelly opened a new pull request, #25118:
URL: https://github.com/apache/flink/pull/25118

   ## What is the purpose of the change
   
   BP #25090 and #25116
   
   ## Brief change log
   
   - `SnapshotFileMergingCompatibilityITCase`
   
   ## Verifying this change
   
   Run `SnapshotFileMergingCompatibilityITCase`
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]

2024-07-24 Thread via GitHub


ferenc-csaky commented on code in PR #821:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1689490037


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java:
##
@@ -72,21 +74,26 @@ public class FlinkConfigManager {
 
 private volatile Configuration defaultConfig;
 private volatile FlinkOperatorConfiguration defaultOperatorConfiguration;
+private final boolean snapshotCrdInstalled;

Review Comment:
   As we discussed offline, the whole snapshot CRD install check is planned for 
1 release to give more meaningful errors for users the first time they upgrade 
to an operator version that includes this feature, but will be useless after 
that. So I think it would make sense to mark this with a TODO to remove it in 
the 1.11 release.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Beifang flink 1.17 [flink]

2024-07-24 Thread via GitHub


beifang1999 closed pull request #25117: Beifang flink 1.17
URL: https://github.com/apache/flink/pull/25117


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Beifang flink 1.17 [flink]

2024-07-24 Thread via GitHub


beifang1999 opened a new pull request, #25117:
URL: https://github.com/apache/flink/pull/25117

   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follow [the 
conventions for tests defined in our code quality 
guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing).
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]

2024-07-24 Thread via GitHub


ferenc-csaky commented on code in PR #821:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1689526438


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java:
##
@@ -83,20 +82,20 @@ void deleteClusterDeployment(
 Configuration conf,
 boolean deleteHaData);
 
-void cancelSessionJob(FlinkSessionJob sessionJob, UpgradeMode upgradeMode, 
Configuration conf)
+Optional cancelSessionJob(
+FlinkSessionJob sessionJob, UpgradeMode upgradeMode, Configuration 
conf)
 throws Exception;
 
-void triggerSavepoint(
+String triggerSavepoint(
 String jobId,
-SnapshotTriggerType triggerType,
-SavepointInfo savepointInfo,
+org.apache.flink.core.execution.SavepointFormatType 
savepointFormatType,
+String savepointDirectory,
 Configuration conf)
 throws Exception;
 
-void triggerCheckpoint(
+String triggerCheckpoint(
 String jobId,
-SnapshotTriggerType triggerType,
-CheckpointInfo checkpointInfo,
+org.apache.flink.core.execution.CheckpointType 
checkpointFormatType,

Review Comment:
   nit: checkpointFormatType -> checkpointType



##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotContext.java:
##
@@ -44,61 +45,56 @@ public class FlinkStateSnapshotContext {
 private final Context josdkContext;
 private final FlinkConfigManager configManager;
 
-private FlinkOperatorConfiguration operatorConfig;
-private Configuration referencedJobObserveConfig;
-private FlinkDeployment referencedJobFlinkDeployment;
+@Getter(lazy = true)
+private final FlinkOperatorConfiguration operatorConfig = operatorConfig();
+
+@Getter(lazy = true)
+private final Configuration referencedJobObserveConfig = 
referencedJobObserveConfig();
+
+@Getter(lazy = true)
+private final FlinkDeployment referencedJobFlinkDeployment = 
referencedJobFlinkDeployment();
 
 /**
  * @return Operator configuration for this resource.
  */
-public FlinkOperatorConfiguration getOperatorConfig() {
-if (operatorConfig != null) {
-return operatorConfig;
-}
-return operatorConfig =
-configManager.getOperatorConfiguration(
-getResource().getMetadata().getNamespace(), null);
+public FlinkOperatorConfiguration operatorConfig() {
+return configManager.getOperatorConfiguration(
+getResource().getMetadata().getNamespace(), null);
+}
+
+public Configuration referencedJobObserveConfig() {
+return 
configManager.getObserveConfig(getReferencedJobFlinkDeployment());
+}
+
+public FlinkDeployment referencedJobFlinkDeployment() {

Review Comment:
   These lazy init methods can be private.



##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java:
##
@@ -72,21 +74,26 @@ public class FlinkConfigManager {
 
 private volatile Configuration defaultConfig;
 private volatile FlinkOperatorConfiguration defaultOperatorConfiguration;
+private final boolean snapshotCrdInstalled;

Review Comment:
   As we discussed offline, the whole snapshot CRD install check is planned for 
1 release to give more meaningful errors for users, so I think it would make 
sense to mark this with a TODO to remove it in the 1.11 release.



##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtils.java:
##
@@ -0,0 +1,395 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.utils;
+
+import org.apache.flink.autoscaler.utils.DateTimeUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.api.CrdConstants;
+import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
+import 

Re: [PR] [cdc-common] add field of defaultValue to Column. [flink-cdc]

2024-07-24 Thread via GitHub


lvyanquan commented on PR #2944:
URL: https://github.com/apache/flink-cdc/pull/2944#issuecomment-2247879650

   Fixed the test case.
   
   And the wrong use of SERIALIZER in 
[TransformSchemaOperator](https://github.com/apache/flink-cdc/blob/ea71b2302ddc5f9b7be65843dbf3f5bed4ca9d8e/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformSchemaOperator.java#L127)
 is actually another problem that could be traced in another jira.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35864][Table SQL / API] Add CONV function [flink]

2024-07-24 Thread via GitHub


lsyldliu commented on code in PR #25109:
URL: https://github.com/apache/flink/pull/25109#discussion_r1689382425


##
docs/data/sql_functions.yml:
##
@@ -238,6 +238,16 @@ arithmetic:
   NUMERIC.hex()
   STRING.hex()
 description: Returns a string representation of an integer NUMERIC value 
or a STRING in hex format. Returns NULL if the argument is NULL. E.g. a numeric 
20 leads to "14", a numeric 100 leads to "64", a string "hello,world" leads to 
"68656C6C6F2C776F726C64".
+  - sql: CONV(num, fromBase, toBase)
+table: num.conv(fromBase, toBase)
+description: |
+  Converts num from fromBase to toBase.
+  The function supports base 2 to base 36, fromBase in [2, 36], 
ABS(toBase) in [2,36], otherwise the function returns null. 
+  If toBase is negative, num is interpreted as a signed number, otherwise 
it is treated as an unsigned number. The result is consistent with this rule. 
+  If num overflows, the function raises an error.
+  num [ | ], fromBase [ | 
 | ], toBase [ |  | ]

Review Comment:
   About `num`: I think there should tell users the exact type instead of the 
logic type family. Users don't know it.
   To `fromBase`, `toBase`:  according to the jira issue description, if 
Integer represents all integer numeric type, BIGINT should also be supported.



##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##
@@ -1817,6 +1817,29 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
 
.outputTypeStrategy(nullableIfArgs(explicit(DataTypes.STRING(
 .build();
 
+public static final BuiltInFunctionDefinition CONV =
+BuiltInFunctionDefinition.newBuilder()
+.name("CONV")
+.kind(SCALAR)
+.inputTypeStrategy(
+sequence(
+Arrays.asList("num", "fromBase", "toBase"),
+Arrays.asList(
+or(
+
logical(LogicalTypeFamily.INTEGER_NUMERIC),
+
logical(LogicalTypeFamily.CHARACTER_STRING)),
+or(
+
logical(LogicalTypeRoot.TINYINT),

Review Comment:
   I think we should also support `bigint` type, I see Spark also supports this 
type. so the code can be optimized as follows ?
   ```
   
or(logical(LogicalTypeFamily.INTEGER_NUMERIC)),
   
or(logical(LogicalTypeFamily.INTEGER_NUMERIC)
   ```



##
docs/data/sql_functions.yml:
##
@@ -238,6 +238,16 @@ arithmetic:
   NUMERIC.hex()
   STRING.hex()
 description: Returns a string representation of an integer NUMERIC value 
or a STRING in hex format. Returns NULL if the argument is NULL. E.g. a numeric 
20 leads to "14", a numeric 100 leads to "64", a string "hello,world" leads to 
"68656C6C6F2C776F726C64".
+  - sql: CONV(num, fromBase, toBase)
+table: num.conv(fromBase, toBase)
+description: |
+  Converts num from fromBase to toBase.

Review Comment:
   ```suggestion
 Converts num from fromBase to toBase.
   ```



##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ConvFunction.java:
##
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.data.binary.BinaryStringDataUtil;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext;
+import org.apache.flink.table.runtime.functions.BaseConversionUtils;
+
+import javax.annotation.Nullable;
+
+import static java.nio.charset.StandardCharsets.UTF_8;

Re: [PR] [FLINK-35740][mysql] Allow column as chunk key even if not in Primary Keys [flink-cdc]

2024-07-24 Thread via GitHub


leonardBang merged PR #3448:
URL: https://github.com/apache/flink-cdc/pull/3448


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [cdc-common] add field of defaultValue to Column. [flink-cdc]

2024-07-24 Thread via GitHub


yuxiqian commented on PR #2944:
URL: https://github.com/apache/flink-cdc/pull/2944#issuecomment-2247573307

   > Done rebase.
   > 
   > Add the cause of failed case is that version info of SERIALIZER were not 
included in 
[TransformSchemaOperator](https://github.com/apache/flink-cdc/blob/ea71b2302ddc5f9b7be65843dbf3f5bed4ca9d8e/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformSchemaOperator.java#L127),
 which will cause incompatible state, I would like to start a discussion with 
@aiwenmo @yuxiqian to solve it.
   
   Can we add some magic header bytes in newer serialization formats (with 
serializer version of course) to distinguish it with legacy ones?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [cdc-common] add field of defaultValue to Column. [flink-cdc]

2024-07-24 Thread via GitHub


yuxiqian commented on PR #2944:
URL: https://github.com/apache/flink-cdc/pull/2944#issuecomment-2247569644

   > Done rebase.
   > 
   > Add the cause of failed case is that version info of SERIALIZER were not 
included in 
[TransformSchemaOperator](https://github.com/apache/flink-cdc/blob/ea71b2302ddc5f9b7be65843dbf3f5bed4ca9d8e/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformSchemaOperator.java#L127),
 which will cause incompatible state, I would like to start a discussion with 
@aiwenmo @yuxiqian to solve it.
   
   My 20 cents: Since it's a legacy problem that we didn't serialize version 
number, can we add magic header bytes to distinguish newer version (with 
serializer version) and legacy ones?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35835][test] Make file-merging test tolerate the scenario that source does not produce any record [flink]

2024-07-24 Thread via GitHub


Zakelly commented on code in PR #25116:
URL: https://github.com/apache/flink/pull/25116#discussion_r1689517664


##
flink-tests/src/test/java/org/apache/flink/test/checkpointing/SnapshotFileMergingCompatibilityITCase.java:
##
@@ -344,13 +343,14 @@ private static void verifyCheckpointExistOrWaitDeleted(
 }
 
 /**
- * Traverse the checkpoint metadata and verify all the state handle is 
disposed.
+ * Traverse the checkpoint metadata and verify all the state handle exist 
or be disposed.
  *
  * @param metadata the metadata to traverse.
+ * @param exist all corresponding files of the state handle should exist
  * @return true if all corresponding files are deleted.
  */
-private static boolean verifyCheckpointDisposed(CheckpointMetadata 
metadata) {
-AtomicBoolean disposed = new AtomicBoolean(true);
+private boolean verifyCheckpointExist(CheckpointMetadata metadata, boolean 
exist) {

Review Comment:
   Cause the logger is referenced 
[here](https://github.com/apache/flink/pull/25116/files/a2b0a3795011a8fae37480af5952e7bfface6f4a#diff-ecfc97da093a973dee815155ff11b431a45cca9853f77aab00916ad68757510dR381-R382),
 which is a non-static member.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35858][Runtime/State] Namespace support for async state [flink]

2024-07-24 Thread via GitHub


Zakelly commented on code in PR #25103:
URL: https://github.com/apache/flink/pull/25103#discussion_r1689514188


##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/RecordContext.java:
##
@@ -56,6 +59,9 @@ public class RecordContext extends 
ReferenceCounted

Re: [PR] [FLINK-35740][mysql] Allow column as chunk key even if not in Primary Keys [flink-cdc]

2024-07-24 Thread via GitHub


leonardBang commented on code in PR #3448:
URL: https://github.com/apache/flink-cdc/pull/3448#discussion_r1689514710


##
docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md:
##
@@ -558,7 +558,10 @@ Flink 定期为 Source 执行 checkpoint,在故障转移的情况下,作业
 
 在执行增量快照读取时,MySQL CDC source 需要一个用于分片的的算法。
 MySQL CDC Source 使用主键列将表划分为多个分片(chunk)。 默认情况下,MySQL CDC source 
会识别表的主键列,并使用主键中的第一列作为用作分片列。
-如果表中没有主键, 增量快照读取将失败,你可以禁用 `scan.incremental.snapshot.enabled` 来回退到旧的快照读取机制。
+如果表中没有主键,用户必须指定 `scan.incremental.snapshot.chunk.key-column`、
+否则增量快照读取将失败,你可以禁用 `scan.incremental.snapshot.enabled` 恢复到旧的快照读取机制。
+请注意,使用不在主键中的列作为分块键可能会降低表的查询性能。
+

Review Comment:
   Sorry when I realized that you may not speak Chinese and it's not necessary 
part.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35740][mysql] Allow column as chunk key even if not in Primary Keys [flink-cdc]

2024-07-24 Thread via GitHub


SML0127 commented on PR #3448:
URL: https://github.com/apache/flink-cdc/pull/3448#issuecomment-2247463147

   @leonardBang 
   Thank you for comments. I modified documents with your suggestions!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35740][mysql] Allow column as chunk key even if not in Primary Keys [flink-cdc]

2024-07-24 Thread via GitHub


SML0127 commented on code in PR #3448:
URL: https://github.com/apache/flink-cdc/pull/3448#discussion_r1689496974


##
docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md:
##
@@ -493,7 +493,7 @@ CREATE TABLE products (
 * (3)在快照读取之前,Source 不需要数据库锁权限。
 
 如果希望 source 并行运行,则每个并行 reader 都应该具有唯一的 server id,因此`server id`的范围必须类似于 
`5400-6400`,
-且范围必须大于并行度。在增量快照读取过程中,MySQL CDC Source 首先通过表的主键将表划分成多个块(chunk),
+且范围必须大于并行度。在增量快照读取过程中,MySQL CDC Source 源首先会根据您指定的表块键将表分块(chunk),

Review Comment:
   modified.



##
docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md:
##
@@ -543,7 +543,7 @@ MySQL 集群中你监控的服务器出现故障后, 你只需将受监视的服
 
 当 MySQL CDC Source 启动时,它并行读取表的快照,然后以单并行度的方式读取表的 binlog。
 
-在快照阶段,根据表的主键和表行的大小将快照切割成多个快照块。
+在快照阶段,快照会根据表的分块键和表行的大小切割成多个快照块。

Review Comment:
   modified.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35835][test] Make file-merging test tolerate the scenario that source does not produce any record [flink]

2024-07-24 Thread via GitHub


fredia commented on code in PR #25116:
URL: https://github.com/apache/flink/pull/25116#discussion_r1689498525


##
flink-tests/src/test/java/org/apache/flink/test/checkpointing/SnapshotFileMergingCompatibilityITCase.java:
##
@@ -344,13 +343,14 @@ private static void verifyCheckpointExistOrWaitDeleted(
 }
 
 /**
- * Traverse the checkpoint metadata and verify all the state handle is 
disposed.
+ * Traverse the checkpoint metadata and verify all the state handle exist 
or be disposed.
  *
  * @param metadata the metadata to traverse.
+ * @param exist all corresponding files of the state handle should exist
  * @return true if all corresponding files are deleted.
  */
-private static boolean verifyCheckpointDisposed(CheckpointMetadata 
metadata) {
-AtomicBoolean disposed = new AtomicBoolean(true);
+private boolean verifyCheckpointExist(CheckpointMetadata metadata, boolean 
exist) {

Review Comment:
   Why remove the `static` mark?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35888][cdc-connector][paimon] Add e2e test for PaimonDataSink. [flink-cdc]

2024-07-24 Thread via GitHub


yuxiqian commented on code in PR #3491:
URL: https://github.com/apache/flink-cdc/pull/3491#discussion_r1689497656


##
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml:
##
@@ -99,6 +107,19 @@ limitations under the License.
 test-jar
 test
 
+
+org.apache.flink
+flink-connector-files
+${flink.version}
+test
+

Review Comment:
   I see, `paimon-flink-common` declares it as a provided dependency.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35857] fix redeploy failed deployment without latest checkpoint [flink-kubernetes-operator]

2024-07-24 Thread via GitHub


gyfora merged PR #855:
URL: https://github.com/apache/flink-kubernetes-operator/pull/855


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35857] fix redeploy failed deployment without latest checkpoint [flink-kubernetes-operator]

2024-07-24 Thread via GitHub


gyfora commented on PR #855:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/855#issuecomment-2247447589

   Thank you for the fix @chenyuzhi459 this was a very nice catch :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35740][mysql] Allow column as chunk key even if not in Primary Keys [flink-cdc]

2024-07-24 Thread via GitHub


leonardBang commented on code in PR #3448:
URL: https://github.com/apache/flink-cdc/pull/3448#discussion_r1689479277


##
docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md:
##
@@ -543,7 +543,7 @@ MySQL 集群中你监控的服务器出现故障后, 你只需将受监视的服
 
 当 MySQL CDC Source 启动时,它并行读取表的快照,然后以单并行度的方式读取表的 binlog。
 
-在快照阶段,根据表的主键和表行的大小将快照切割成多个快照块。
+在快照阶段,快照会根据表的分块键和表行的大小切割成多个快照块。

Review Comment:
   在快照阶段,快照会根据表的chunk key 和表中数据行数切割成多个块(chunk)之后分布式读取。
   



##
docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md:
##
@@ -493,7 +493,7 @@ CREATE TABLE products (
 * (3)在快照读取之前,Source 不需要数据库锁权限。
 
 如果希望 source 并行运行,则每个并行 reader 都应该具有唯一的 server id,因此`server id`的范围必须类似于 
`5400-6400`,
-且范围必须大于并行度。在增量快照读取过程中,MySQL CDC Source 首先通过表的主键将表划分成多个块(chunk),
+且范围必须大于并行度。在增量快照读取过程中,MySQL CDC Source 源首先会根据您指定的表块键将表分块(chunk),

Review Comment:
   且范围必须大于并行度。在增量快照读取过程中,MySQL CDC Source 源首先会根据您指定的chunk key将表分块(chunk)
   



##
docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md:
##
@@ -558,7 +558,10 @@ Flink 定期为 Source 执行 checkpoint,在故障转移的情况下,作业
 
 在执行增量快照读取时,MySQL CDC source 需要一个用于分片的的算法。
 MySQL CDC Source 使用主键列将表划分为多个分片(chunk)。 默认情况下,MySQL CDC source 
会识别表的主键列,并使用主键中的第一列作为用作分片列。
-如果表中没有主键, 增量快照读取将失败,你可以禁用 `scan.incremental.snapshot.enabled` 来回退到旧的快照读取机制。
+如果表中没有主键,用户必须指定 `scan.incremental.snapshot.chunk.key-column`、
+否则增量快照读取将失败,你可以禁用 `scan.incremental.snapshot.enabled` 恢复到旧的快照读取机制。
+请注意,使用不在主键中的列作为分块键可能会降低表的查询性能。

Review Comment:
   如果表中没有主键,用户需要指定 
`scan.incremental.snapshot.chunk.key-column`,否则增量快照读取将会失败,你可以禁用 
`scan.incremental.snapshot.enabled` 恢复到旧的快照读取机制。
   
   请注意:使用不在主键中的列作为chunk key可能会到查询表中数据的性能下降。



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35888][cdc-connector][paimon] Add e2e test for PaimonDataSink. [flink-cdc]

2024-07-24 Thread via GitHub


lvyanquan commented on code in PR #3491:
URL: https://github.com/apache/flink-cdc/pull/3491#discussion_r1689460319


##
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml:
##
@@ -99,6 +107,19 @@ limitations under the License.
 test-jar
 test
 
+
+org.apache.flink
+flink-connector-files
+${flink.version}
+test
+

Review Comment:
   Yeah, if removed, will got the following exception:
   
![image](https://github.com/user-attachments/assets/b774d082-b903-440e-a5a0-74c11084fd5e)
   
   And flink connector files is also provided in FLINK_HOME/lib directory.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35888][cdc-connector][paimon] Add e2e test for PaimonDataSink. [flink-cdc]

2024-07-24 Thread via GitHub


lvyanquan commented on code in PR #3491:
URL: https://github.com/apache/flink-cdc/pull/3491#discussion_r1689470540


##
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToPaimonE2eITCase.java:
##
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.pipeline.tests;
+
+import org.apache.flink.cdc.common.test.utils.TestUtils;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
+import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
+import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Stream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/** End-to-end tests for mysql cdc to Paimon pipeline job. */
+@RunWith(Parameterized.class)
+public class MySqlToPaimonE2eITCase extends PipelineTestEnvironment {
+private static final Logger LOG = 
LoggerFactory.getLogger(MySqlToPaimonE2eITCase.class);
+
+public static final int TESTCASE_TIMEOUT_SECONDS = 60;
+
+private TableEnvironment tEnv;
+
+// 
--
+// MySQL Variables (we always use MySQL as the data source for easier 
verifying)
+// 
--
+protected static final String MYSQL_TEST_USER = "mysqluser";
+protected static final String MYSQL_TEST_PASSWORD = "mysqlpw";
+
+@ClassRule
+public static final MySqlContainer MYSQL =
+(MySqlContainer)
+new MySqlContainer(
+MySqlVersion.V8_0) // v8 support both ARM 
and AMD architectures
+.withConfigurationOverride("docker/mysql/my.cnf")
+.withSetupSQL("docker/mysql/setup.sql")
+.withDatabaseName("flink-test")
+.withUsername("flinkuser")
+.withPassword("flinkpw")
+.withNetwork(NETWORK)
+.withNetworkAliases("mysql")
+.withLogConsumer(new Slf4jLogConsumer(LOG));
+
+protected final UniqueDatabase mysqlInventoryDatabase =
+new UniqueDatabase(MYSQL, "mysql_inventory", MYSQL_TEST_USER, 
MYSQL_TEST_PASSWORD);
+
+public MySqlToPaimonE2eITCase() throws IOException {}
+
+@BeforeClass
+public static void initializeContainers() {
+LOG.info("Starting containers...");
+Startables.deepStart(Stream.of(MYSQL)).join();
+LOG.info("Containers are started.");
+}
+
+@Before
+public void before() throws Exception {
+super.before();
+mysqlInventoryDatabase.createAndInitialize();
+tEnv = 
TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
+}
+
+@After
+public void after() {
+super.after();
+mysqlInventoryDatabase.dropDatabase();
+}
+
+@Test
+public void testSyncWholeDatabase() throws Exception {
+  

Re: [PR] [FLINK-35740][mysql] Allow column as chunk key even if not in Primary Keys [flink-cdc]

2024-07-24 Thread via GitHub


SML0127 commented on code in PR #3448:
URL: https://github.com/apache/flink-cdc/pull/3448#discussion_r1689464653


##
docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md:
##
@@ -558,7 +558,10 @@ Flink 定期为 Source 执行 checkpoint,在故障转移的情况下,作业
 
 在执行增量快照读取时,MySQL CDC source 需要一个用于分片的的算法。
 MySQL CDC Source 使用主键列将表划分为多个分片(chunk)。 默认情况下,MySQL CDC source 
会识别表的主键列,并使用主键中的第一列作为用作分片列。
-如果表中没有主键, 增量快照读取将失败,你可以禁用 `scan.incremental.snapshot.enabled` 来回退到旧的快照读取机制。
+如果表中没有主键,用户必须指定 `scan.incremental.snapshot.chunk.key-column`、
+否则增量快照读取将失败,你可以禁用 `scan.incremental.snapshot.enabled` 恢复到旧的快照读取机制。
+请注意,使用不在主键中的列作为分块键可能会降低表的查询性能。
+

Review Comment:
   @leonardBang 
   I modified documents using a translator. Please check it out and feel free 
to correct any awkward points.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35888][cdc-connector][paimon] Add e2e test for PaimonDataSink. [flink-cdc]

2024-07-24 Thread via GitHub


lvyanquan commented on code in PR #3491:
URL: https://github.com/apache/flink-cdc/pull/3491#discussion_r1689460319


##
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml:
##
@@ -99,6 +107,19 @@ limitations under the License.
 test-jar
 test
 
+
+org.apache.flink
+flink-connector-files
+${flink.version}
+test
+

Review Comment:
   Yeah, if removed, will got the following exception:
   
![image](https://github.com/user-attachments/assets/b774d082-b903-440e-a5a0-74c11084fd5e)
   
   And flink-connector-files is also provided in FLINK_HOME/lib directory.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33874][runtime] Support resource request wait mechanism at DefaultDeclarativeSlotPool side for Default Scheduler [flink]

2024-07-24 Thread via GitHub


RocMarshal commented on code in PR #25113:
URL: https://github.com/apache/flink/pull/25113#discussion_r1689426208


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AbstractSlotPoolServiceFactory.java:
##
@@ -34,14 +36,18 @@ public abstract class AbstractSlotPoolServiceFactory 
implements SlotPoolServiceF
 
 @Nonnull protected final Time batchSlotTimeout;
 
+@Nonnull protected final Duration slotRequestMaxInterval;
+
 protected AbstractSlotPoolServiceFactory(
 @Nonnull Clock clock,
 @Nonnull Time rpcTimeout,
 @Nonnull Time slotIdleTimeout,
-@Nonnull Time batchSlotTimeout) {
+@Nonnull Time batchSlotTimeout,

Review Comment:
   updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33874][runtime] Support resource request wait mechanism at DefaultDeclarativeSlotPool side for Default Scheduler [flink]

2024-07-24 Thread via GitHub


RocMarshal commented on code in PR #25113:
URL: https://github.com/apache/flink/pull/25113#discussion_r1689424572


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java:
##
@@ -1115,6 +1116,11 @@ public CompletableFuture 
updateJobResourceRequirements(
 return CompletableFuture.completedFuture(Acknowledge.get());
 }
 
+@VisibleForTesting
+public SlotPoolService getSlotPoolService() {

Review Comment:
   Thanks for the commet.
   It's a pure redundant test case.



##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java:
##
@@ -1115,6 +1116,11 @@ public CompletableFuture 
updateJobResourceRequirements(
 return CompletableFuture.completedFuture(Acknowledge.get());
 }
 
+@VisibleForTesting
+public SlotPoolService getSlotPoolService() {

Review Comment:
   Thanks for the comment.
   It's a pure redundant test case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]

2024-07-24 Thread via GitHub


mateczagany commented on PR #821:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/821#issuecomment-2247282685

   Thanks for your feedback @anupamaggarwal, really appreciate it!
   
   1. I agree, and this is what I expected to happen. Except it does not, 
because I forgot to add the FlinkStateSnapshot resource to the 
`ValidatingWebhookConfiguration` resource in the Helm chart, I will push a 
commit soon.
   2. This PR is already quite huge, so I have decided to do the examples/docs 
in a separate PR, there is currently a draft for it: 
https://github.com/apache/flink-kubernetes-operator/pull/854
   3. You are right, adding the config there would improve its visibility to 
the end-users, so I will do that in a future commit, thank you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35740][mysql] Allow column as chunk key even if not in Primary Keys [flink-cdc]

2024-07-24 Thread via GitHub


SML0127 commented on PR #3448:
URL: https://github.com/apache/flink-cdc/pull/3448#issuecomment-2247249844

   @leonardBang @ruanhang1993
   PTAL~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35858][Runtime/State] Namespace support for async state [flink]

2024-07-24 Thread via GitHub


fredia commented on code in PR #25103:
URL: https://github.com/apache/flink/pull/25103#discussion_r1689372550


##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/RecordContext.java:
##
@@ -56,6 +59,9 @@ public class RecordContext extends 
ReferenceCounted

Re: [PR] [FLINK-35318][table-planner] use UTC timezone to handle TIMESTAMP_WITHOUT_TIME_ZONE type in RexNodeToExpressionConverter [flink]

2024-07-24 Thread via GitHub


leonardBang merged PR #25094:
URL: https://github.com/apache/flink/pull/25094


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35318][table-planner] use UTC timezone to handle TIMESTAMP_WITHOUT_TIME_ZONE type in RexNodeToExpressionConverter [flink]

2024-07-24 Thread via GitHub


leonardBang merged PR #25093:
URL: https://github.com/apache/flink/pull/25093


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35363] FLIP-449: Reorganization of flink-connector-jdbc [flink-connector-jdbc]

2024-07-24 Thread via GitHub


1996fanrui commented on PR #123:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/123#issuecomment-2247216861

   Thanks @eskabetxe for the contribution, and thanks @RocMarshal for the 
review!
   
   Due to both of you are active contributor of 
[flink-connector-jdbc](https://github.com/apache/flink-connector-jdbc), and you 
aren't committer. So I will merge this PR if no objection before next Monday(in 
5 days).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



<    1   2   3   4   5   6   7   8   9   10   >