Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]
mateczagany commented on code in PR #821: URL: https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1691974263 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java: ## @@ -716,6 +695,48 @@ public CheckpointFetchResult fetchCheckpointInfo( } } +@Override +public CheckpointStatsResult fetchCheckpointStats( +String jobId, Long checkpointId, Configuration conf) { +try (RestClusterClient clusterClient = getClusterClient(conf)) { +var checkpointStatusHeaders = CheckpointStatisticDetailsHeaders.getInstance(); +var parameters = checkpointStatusHeaders.getUnresolvedMessageParameters(); +parameters.jobPathParameter.resolve(JobID.fromHexString(jobId)); + +// This was needed because the parameter is protected +var checkpointIdPathParameter = +(CheckpointIdPathParameter) Iterables.getLast(parameters.getPathParameters()); +checkpointIdPathParameter.resolve(checkpointId); + +var response = +clusterClient.sendRequest( +checkpointStatusHeaders, parameters, EmptyRequestBody.getInstance()); + +var stats = response.get(); +if (stats == null) { +throw new IllegalStateException("Checkpoint ID %d for job %s does not exist!"); +} else if (stats instanceof CheckpointStatistics.CompletedCheckpointStatistics) { +return CheckpointStatsResult.completed( +((CheckpointStatistics.CompletedCheckpointStatistics) stats) +.getExternalPath()); +} else if (stats instanceof CheckpointStatistics.FailedCheckpointStatistics) { +return CheckpointStatsResult.error( +((CheckpointStatistics.FailedCheckpointStatistics) stats) +.getFailureMessage()); +} else if (stats instanceof CheckpointStatistics.PendingCheckpointStatistics) { +return CheckpointStatsResult.pending(); +} else { +throw new IllegalArgumentException( +String.format( +"Unknown checkpoint statistics result class: %s", +stats.getClass().getSimpleName())); +} +} catch (Exception e) { +LOG.error("Exception while fetching checkpoint statistics", e); Review Comment: This commit made it so we ignore errors when fetching checkpoint stats, since at that point we have determined that the checkpoint was successful anyways. We just set an empty path: https://github.com/apache/flink-kubernetes-operator/pull/821/commits/637b3053b9644b3d6cadd1fb6a05e1f5aab75fa6 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35736][tests] Add migration test scripts & CI workflows [flink-cdc]
morazow commented on code in PR #3447: URL: https://github.com/apache/flink-cdc/pull/3447#discussion_r1691956059 ## tools/mig-test/README.md: ## @@ -0,0 +1,36 @@ +# Flink CDC Migration Test Utilities + +## Pipeline Jobs +### Preparation + +1. Install Ruby (macOS has embedded it by default) +2. (Optional) Run `gem install terminal-table` for better display + +### Compile snapshot CDC versions +3. Set `CDC_SOURCE_HOME` to the root directory of the Flink CDC git repository +4. Go to `tools/mig-test` and run `ruby prepare_libs.rb` to download released / compile snapshot CDC versions + +### Run migration tests +5. Enter `conf/` and run `docker compose up -d` to start up test containers +6. Set `FLINK_HOME` to the home directory of Flink +7. Go back to `tools/mig-test` and run `ruby run_migration_test.rb` to start testing + +### Result +The migration result will be displayed in the console like this: + +``` +++ +| Migration Test Result| ++--+---+---+---+--+--+ +| | 3.0.0 | 3.0.1 | 3.1.0 | 3.1-SNAPSHOT | 3.2-SNAPSHOT | +| 3.0.0| ❓| ❓| ❌| ✅ | ✅ | +| 3.0.1| | ❓| ❌| ✅ | ✅ | +| 3.1.0| | | ✅| ❌ | ❌ | +| 3.1-SNAPSHOT | | | | ✅ | ✅ | +| 3.2-SNAPSHOT | | | | | ✅ | ++--+---+---+---+--+--+ +``` Review Comment: Could we add the meaning of `?` here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]
mateczagany commented on code in PR #821: URL: https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1691962117 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtils.java: ## @@ -0,0 +1,382 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.utils; + +import org.apache.flink.autoscaler.utils.DateTimeUtils; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; +import org.apache.flink.kubernetes.operator.api.CrdConstants; +import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot; +import org.apache.flink.kubernetes.operator.api.spec.CheckpointSpec; +import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference; +import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotSpec; +import org.apache.flink.kubernetes.operator.api.spec.JobReference; +import org.apache.flink.kubernetes.operator.api.spec.SavepointSpec; +import org.apache.flink.kubernetes.operator.api.status.CheckpointType; +import org.apache.flink.kubernetes.operator.api.status.SavepointFormatType; +import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType; +import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration; +import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions; +import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; +import org.apache.flink.kubernetes.operator.reconciler.SnapshotType; + +import io.fabric8.kubernetes.api.model.ObjectMeta; +import io.fabric8.kubernetes.client.KubernetesClient; +import org.apache.commons.lang3.StringUtils; + +import javax.annotation.Nullable; + +import java.time.Instant; +import java.util.UUID; + +import static org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.ABANDONED; +import static org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.COMPLETED; +import static org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.IN_PROGRESS; +import static org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.TRIGGER_PENDING; +import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.SNAPSHOT_RESOURCE_ENABLED; +import static org.apache.flink.kubernetes.operator.reconciler.SnapshotType.CHECKPOINT; +import static org.apache.flink.kubernetes.operator.reconciler.SnapshotType.SAVEPOINT; + +/** Utilities class for FlinkStateSnapshot resources. */ +public class FlinkStateSnapshotUtils { + +/** + * From a snapshot reference, return its snapshot path. If a {@link FlinkStateSnapshot} is + * referenced, it will be retrieved from Kubernetes. + * + * @param kubernetesClient kubernetes client + * @param snapshotRef snapshot reference + * @return found savepoint path + */ +public static String getValidatedFlinkStateSnapshotPath( +KubernetesClient kubernetesClient, FlinkStateSnapshotReference snapshotRef) { +if (StringUtils.isNotBlank(snapshotRef.getPath())) { +return snapshotRef.getPath(); +} + +if (StringUtils.isBlank(snapshotRef.getName())) { +throw new IllegalArgumentException( +String.format("Invalid snapshot name: %s", snapshotRef.getName())); +} + +var result = +snapshotRef.getNamespace() == null +? kubernetesClient +.resources(FlinkStateSnapshot.class) +.withName(snapshotRef.getName()) +.get() +: kubernetesClient +.resources(FlinkStateSnapshot.class) +.inNamespace(snapshotRef.getNamespace()) +.withName(snapshotRef.getName()) +.get(); + +if (result == null) { +throw new IllegalStateException( +String.format( +
Re: [PR] [FLINK-30481][FLIP-277] GlueCatalog Implementation [flink-connector-aws]
Samrat002 commented on code in PR #47: URL: https://github.com/apache/flink-connector-aws/pull/47#discussion_r1691963997 ## flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/GlueUtils.java: ## @@ -0,0 +1,384 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue.util; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogDatabaseImpl; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.FunctionLanguage; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.glue.GlueCatalogOptions; +import org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.glue.model.Column; +import software.amazon.awssdk.services.glue.model.Database; +import software.amazon.awssdk.services.glue.model.GlueResponse; +import software.amazon.awssdk.services.glue.model.Table; +import software.amazon.awssdk.services.glue.model.UserDefinedFunction; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly; + +/** Utilities related glue Operation. */ +@Internal +public class GlueUtils { + +private static final Logger LOG = LoggerFactory.getLogger(GlueUtils.class); + +/** + * Glue supports lowercase naming convention. + * + * @param name fully qualified name. + * @return modified name according to glue convention. + */ +public static String getGlueConventionalName(String name) { + +return name.toLowerCase(Locale.ROOT); +} + +/** + * Extract location from database properties if present and remove location from properties. + * fallback to create default location if not present + * + * @param databaseProperties database properties. + * @param databaseName fully qualified name for database. + * @return location for database. + */ +public static String extractDatabaseLocation( +final Map databaseProperties, +final String databaseName, +final String catalogPath) { +if (databaseProperties.containsKey(GlueCatalogConstants.LOCATION_URI)) { +return databaseProperties.remove(GlueCatalogConstants.LOCATION_URI); +} else { +LOG.info("No location URI Set. Using Catalog Path as default"); +return catalogPath + GlueCatalogConstants.LOCATION_SEPARATOR + databaseName; +} +} + +/** + * Extract location from database properties if present and remove location from properties. + * fallback to create default location if not present + * + * @param tableProperties table properties. + * @param tablePath fully qualified object for table. + * @return location for table. + */ +public static String extractTableLocation( +final Map tableProperties, +final ObjectPath tablePath, +final String catalogPath) { +if (tableProperties.containsKey(GlueCatalogConstants.LOCATION_URI)) { +return tableProperties.remove(GlueCatalogConstants.LOCATION_URI); +} else { +return catalogPath ++ GlueCatalogConstants.LOCATION_SEPARATOR ++ tablePath.getDatabaseName() ++ GlueCatalogConstants.LOCATION_SEPARATOR ++
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#issuecomment-2251149507 > @19priyadhingra The tests seems to be failing. Can we please take a look? > > Also - it would be good if we squash the commits! 1. Hi Hong, I tried to deep dive more on the only failed test: elementConverterWillOpenSerializationSchema failed logs: https://paste.amazon.com/show/dhipriya/1721770192 it complains TestSinkInitContext cant be cast to sink2.Sink$InitContext. I understood the reason why it is failing but not sure about how to fix it. This issue started coming post 1.19.1 where we made change in TestSinkInitContext. One of the easy way could be to remove this test if it is not adding any big value or other fix might require change in flink-connector-base where we have to keep the old TestSinkInitContext? Right now I removed that test, please let me know your thought on this. [https://github.com/apache/flink/blob/release-1.19.1/flink-connectors/flink-connect[…]pache/flink/connector/base/sink/writer/TestSinkInitContext.java](https://github.com/apache/flink/blob/release-1.19.1/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java#L51) -> new TestSinkInitContext implements WriterInitContext whereas old TestSinkInitContext ( [https://github.com/apache/flink/blob/release-1.18.1/flink-connectors/flink-connect[…]pache/flink/connector/base/sink/writer/TestSinkInitContext.java](https://github.com/apache/flink/blob/release-1.18.1/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java#L47)) implements Sink.InitContext Now new one which implements [WriterInitContext](https://github.com/apache/flink/blob/release-1.19.1/flink-core/src/main/java/org/apache/flink/api/connector/sink2/WriterInitContext.java#L35) extends org.apache.flink.api.connector.sink2.InitContext, not the required one org.apache.flink.api.connector.sink2.Sink.InitContext 2. Squashed the commits -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]
mateczagany commented on code in PR #821: URL: https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1691933742 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/snapshot/StateSnapshotReconciler.java: ## @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.reconciler.snapshot; + +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.kubernetes.operator.api.FlinkDeployment; +import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot; +import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotSpec; +import org.apache.flink.kubernetes.operator.controller.FlinkStateSnapshotContext; +import org.apache.flink.kubernetes.operator.exception.ReconciliationException; +import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; +import org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory; +import org.apache.flink.kubernetes.operator.utils.EventRecorder; +import org.apache.flink.kubernetes.operator.utils.FlinkStateSnapshotUtils; +import org.apache.flink.kubernetes.operator.utils.SnapshotUtils; +import org.apache.flink.util.Preconditions; + +import io.javaoperatorsdk.operator.api.reconciler.DeleteControl; +import lombok.RequiredArgsConstructor; +import org.apache.commons.lang3.ObjectUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Optional; + +import static org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.TRIGGER_PENDING; + +/** The reconciler for the {@link org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot}. */ +@RequiredArgsConstructor +public class StateSnapshotReconciler { + +private static final Logger LOG = LoggerFactory.getLogger(StateSnapshotReconciler.class); + +private final FlinkResourceContextFactory ctxFactory; +private final EventRecorder eventRecorder; + +public void reconcile(FlinkStateSnapshotContext ctx) { +var resource = ctx.getResource(); + +var savepointState = resource.getStatus().getState(); +if (!TRIGGER_PENDING.equals(savepointState)) { +return; +} + +if (resource.getSpec().isSavepoint() +&& resource.getSpec().getSavepoint().getAlreadyExists()) { +LOG.info( +"Snapshot {} is marked as completed in spec, skipping triggering savepoint.", +resource.getMetadata().getName()); + +FlinkStateSnapshotUtils.snapshotSuccessful( +resource, resource.getSpec().getSavepoint().getPath(), true); +return; +} + +if (FlinkStateSnapshotUtils.abandonSnapshotIfJobNotRunning( +ctx.getKubernetesClient(), +ctx.getResource(), +ctx.getSecondaryResource().orElse(null), +eventRecorder)) { +return; +} + +var jobId = ctx.getSecondaryResource().orElseThrow().getStatus().getJobStatus().getJobId(); + +Optional triggerIdOpt; +try { +triggerIdOpt = triggerCheckpointOrSavepoint(resource.getSpec(), ctx, jobId); +} catch (Exception e) { +LOG.error("Failed to trigger snapshot for resource {}", ctx.getResource(), e); +throw new ReconciliationException(e); +} + +if (triggerIdOpt.isEmpty()) { +LOG.warn("Failed to trigger snapshot {}", resource.getMetadata().getName()); +return; +} + +FlinkStateSnapshotUtils.snapshotInProgress(resource, triggerIdOpt.get()); +} + +public DeleteControl cleanup(FlinkStateSnapshotContext ctx) throws Exception { +var resource = ctx.getResource(); +var state = resource.getStatus().getState(); +var resourceName = resource.getMetadata().getName(); +LOG.info("Cleaning up resource {}...", resourceName); + +if (resource.getSpec().isCheckpoint()) { +return DeleteControl.defaultDelete(); +} +if (!resource.getSpec().getSavepoint().getDisposeOnDelete()) { +return
Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]
anupamaggarwal commented on code in PR #821: URL: https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1691868012 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtils.java: ## @@ -0,0 +1,382 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.utils; + +import org.apache.flink.autoscaler.utils.DateTimeUtils; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; +import org.apache.flink.kubernetes.operator.api.CrdConstants; +import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot; +import org.apache.flink.kubernetes.operator.api.spec.CheckpointSpec; +import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference; +import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotSpec; +import org.apache.flink.kubernetes.operator.api.spec.JobReference; +import org.apache.flink.kubernetes.operator.api.spec.SavepointSpec; +import org.apache.flink.kubernetes.operator.api.status.CheckpointType; +import org.apache.flink.kubernetes.operator.api.status.SavepointFormatType; +import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType; +import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration; +import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions; +import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; +import org.apache.flink.kubernetes.operator.reconciler.SnapshotType; + +import io.fabric8.kubernetes.api.model.ObjectMeta; +import io.fabric8.kubernetes.client.KubernetesClient; +import org.apache.commons.lang3.StringUtils; + +import javax.annotation.Nullable; + +import java.time.Instant; +import java.util.UUID; + +import static org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.ABANDONED; +import static org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.COMPLETED; +import static org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.IN_PROGRESS; +import static org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.TRIGGER_PENDING; +import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.SNAPSHOT_RESOURCE_ENABLED; +import static org.apache.flink.kubernetes.operator.reconciler.SnapshotType.CHECKPOINT; +import static org.apache.flink.kubernetes.operator.reconciler.SnapshotType.SAVEPOINT; + +/** Utilities class for FlinkStateSnapshot resources. */ +public class FlinkStateSnapshotUtils { + +/** + * From a snapshot reference, return its snapshot path. If a {@link FlinkStateSnapshot} is + * referenced, it will be retrieved from Kubernetes. + * + * @param kubernetesClient kubernetes client + * @param snapshotRef snapshot reference + * @return found savepoint path + */ +public static String getValidatedFlinkStateSnapshotPath( +KubernetesClient kubernetesClient, FlinkStateSnapshotReference snapshotRef) { +if (StringUtils.isNotBlank(snapshotRef.getPath())) { +return snapshotRef.getPath(); +} + +if (StringUtils.isBlank(snapshotRef.getName())) { +throw new IllegalArgumentException( +String.format("Invalid snapshot name: %s", snapshotRef.getName())); +} + +var result = +snapshotRef.getNamespace() == null +? kubernetesClient +.resources(FlinkStateSnapshot.class) +.withName(snapshotRef.getName()) +.get() +: kubernetesClient +.resources(FlinkStateSnapshot.class) +.inNamespace(snapshotRef.getNamespace()) +.withName(snapshotRef.getName()) +.get(); + +if (result == null) { +throw new IllegalStateException( +String.format( +
Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]
anupamaggarwal commented on code in PR #821: URL: https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1691872393 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/snapshot/StateSnapshotReconciler.java: ## @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.reconciler.snapshot; + +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.kubernetes.operator.api.FlinkDeployment; +import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot; +import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotSpec; +import org.apache.flink.kubernetes.operator.controller.FlinkStateSnapshotContext; +import org.apache.flink.kubernetes.operator.exception.ReconciliationException; +import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; +import org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory; +import org.apache.flink.kubernetes.operator.utils.EventRecorder; +import org.apache.flink.kubernetes.operator.utils.FlinkStateSnapshotUtils; +import org.apache.flink.kubernetes.operator.utils.SnapshotUtils; +import org.apache.flink.util.Preconditions; + +import io.javaoperatorsdk.operator.api.reconciler.DeleteControl; +import lombok.RequiredArgsConstructor; +import org.apache.commons.lang3.ObjectUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Optional; + +import static org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.TRIGGER_PENDING; + +/** The reconciler for the {@link org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot}. */ +@RequiredArgsConstructor +public class StateSnapshotReconciler { + +private static final Logger LOG = LoggerFactory.getLogger(StateSnapshotReconciler.class); + +private final FlinkResourceContextFactory ctxFactory; +private final EventRecorder eventRecorder; + +public void reconcile(FlinkStateSnapshotContext ctx) { +var resource = ctx.getResource(); + +var savepointState = resource.getStatus().getState(); +if (!TRIGGER_PENDING.equals(savepointState)) { +return; +} + +if (resource.getSpec().isSavepoint() +&& resource.getSpec().getSavepoint().getAlreadyExists()) { +LOG.info( +"Snapshot {} is marked as completed in spec, skipping triggering savepoint.", +resource.getMetadata().getName()); + +FlinkStateSnapshotUtils.snapshotSuccessful( +resource, resource.getSpec().getSavepoint().getPath(), true); +return; +} + +if (FlinkStateSnapshotUtils.abandonSnapshotIfJobNotRunning( +ctx.getKubernetesClient(), +ctx.getResource(), +ctx.getSecondaryResource().orElse(null), +eventRecorder)) { +return; +} + +var jobId = ctx.getSecondaryResource().orElseThrow().getStatus().getJobStatus().getJobId(); + +Optional triggerIdOpt; +try { +triggerIdOpt = triggerCheckpointOrSavepoint(resource.getSpec(), ctx, jobId); +} catch (Exception e) { +LOG.error("Failed to trigger snapshot for resource {}", ctx.getResource(), e); +throw new ReconciliationException(e); +} + +if (triggerIdOpt.isEmpty()) { +LOG.warn("Failed to trigger snapshot {}", resource.getMetadata().getName()); +return; +} + +FlinkStateSnapshotUtils.snapshotInProgress(resource, triggerIdOpt.get()); +} + +public DeleteControl cleanup(FlinkStateSnapshotContext ctx) throws Exception { +var resource = ctx.getResource(); +var state = resource.getStatus().getState(); +var resourceName = resource.getMetadata().getName(); +LOG.info("Cleaning up resource {}...", resourceName); + +if (resource.getSpec().isCheckpoint()) { +return DeleteControl.defaultDelete(); +} +if (!resource.getSpec().getSavepoint().getDisposeOnDelete()) { +return
Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]
anupamaggarwal commented on code in PR #821: URL: https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1691872393 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/snapshot/StateSnapshotReconciler.java: ## @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.reconciler.snapshot; + +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.kubernetes.operator.api.FlinkDeployment; +import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot; +import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotSpec; +import org.apache.flink.kubernetes.operator.controller.FlinkStateSnapshotContext; +import org.apache.flink.kubernetes.operator.exception.ReconciliationException; +import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; +import org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory; +import org.apache.flink.kubernetes.operator.utils.EventRecorder; +import org.apache.flink.kubernetes.operator.utils.FlinkStateSnapshotUtils; +import org.apache.flink.kubernetes.operator.utils.SnapshotUtils; +import org.apache.flink.util.Preconditions; + +import io.javaoperatorsdk.operator.api.reconciler.DeleteControl; +import lombok.RequiredArgsConstructor; +import org.apache.commons.lang3.ObjectUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Optional; + +import static org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.TRIGGER_PENDING; + +/** The reconciler for the {@link org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot}. */ +@RequiredArgsConstructor +public class StateSnapshotReconciler { + +private static final Logger LOG = LoggerFactory.getLogger(StateSnapshotReconciler.class); + +private final FlinkResourceContextFactory ctxFactory; +private final EventRecorder eventRecorder; + +public void reconcile(FlinkStateSnapshotContext ctx) { +var resource = ctx.getResource(); + +var savepointState = resource.getStatus().getState(); +if (!TRIGGER_PENDING.equals(savepointState)) { +return; +} + +if (resource.getSpec().isSavepoint() +&& resource.getSpec().getSavepoint().getAlreadyExists()) { +LOG.info( +"Snapshot {} is marked as completed in spec, skipping triggering savepoint.", +resource.getMetadata().getName()); + +FlinkStateSnapshotUtils.snapshotSuccessful( +resource, resource.getSpec().getSavepoint().getPath(), true); +return; +} + +if (FlinkStateSnapshotUtils.abandonSnapshotIfJobNotRunning( +ctx.getKubernetesClient(), +ctx.getResource(), +ctx.getSecondaryResource().orElse(null), +eventRecorder)) { +return; +} + +var jobId = ctx.getSecondaryResource().orElseThrow().getStatus().getJobStatus().getJobId(); + +Optional triggerIdOpt; +try { +triggerIdOpt = triggerCheckpointOrSavepoint(resource.getSpec(), ctx, jobId); +} catch (Exception e) { +LOG.error("Failed to trigger snapshot for resource {}", ctx.getResource(), e); +throw new ReconciliationException(e); +} + +if (triggerIdOpt.isEmpty()) { +LOG.warn("Failed to trigger snapshot {}", resource.getMetadata().getName()); +return; +} + +FlinkStateSnapshotUtils.snapshotInProgress(resource, triggerIdOpt.get()); +} + +public DeleteControl cleanup(FlinkStateSnapshotContext ctx) throws Exception { +var resource = ctx.getResource(); +var state = resource.getStatus().getState(); +var resourceName = resource.getMetadata().getName(); +LOG.info("Cleaning up resource {}...", resourceName); + +if (resource.getSpec().isCheckpoint()) { +return DeleteControl.defaultDelete(); +} +if (!resource.getSpec().getSavepoint().getDisposeOnDelete()) { +return
Re: [PR] [FLINK-35897][Checkpoint] Cleanup completed resource when checkpoint is canceled [flink]
flinkbot commented on PR #25120: URL: https://github.com/apache/flink/pull/25120#issuecomment-2250449661 ## CI report: * 5b0ca74d3951873a800d5b3ef0d23736f6fab755 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-35897][Checkpoint] Cleanup completed resource when checkpoint is canceled [flink]
ljz2051 opened a new pull request, #25120: URL: https://github.com/apache/flink/pull/25120 ## What is the purpose of the change This pull request cleanup completed checkpoint resource when the job checkpoint is canceled. ## Brief change log - When the asynchronous snapshot thread completes a checkpoint, cleanup the completed checkpoint if it finds that the checkpoint has been canceled. ## Verifying this change This change is already covered by existing tests, such as AsyncSnapshotCallableTest. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no ) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? no -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35885][table] Ignore advancing window processor by watermark in window agg based on proctime [flink]
flinkbot commented on PR #25119: URL: https://github.com/apache/flink/pull/25119#issuecomment-2250260386 ## CI report: * f529368b2ff98c7a50d5e790c9b9ca879ca200a8 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-35885][table] Ignore advancing window processor by watermark in window agg based on proctime [flink]
xuyangzhong opened a new pull request, #25119: URL: https://github.com/apache/flink/pull/25119 ## What is the purpose of the change This pr tries to ignore the advancement of the window processor caused by watermark in proctime window agg. ## Brief change log - *Ignore the advancement of the window processor caused by watermark in proctime window agg* - *Add harness test for this bugfix* ## Verifying this change A harness test has been added to verify this pr. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35282][FLINK-35520] PyFlink Support for Apache Beam > 2.49 [flink]
xaniasd commented on PR #24908: URL: https://github.com/apache/flink/pull/24908#issuecomment-2250200976 hi there @snuyanzin @hlteoh37, are you actively working on this PR? Is there any way I could help with this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35894] Add Elasticsearch Sink Connector for Flink CDC Pipeline [flink-cdc]
lvyanquan commented on PR #3495: URL: https://github.com/apache/flink-cdc/pull/3495#issuecomment-2250180097 Thanks @proletarians for this contribution, left some comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35894] Add Elasticsearch Sink Connector for Flink CDC Pipeline [flink-cdc]
lvyanquan commented on code in PR #3495: URL: https://github.com/apache/flink-cdc/pull/3495#discussion_r1691213209 ## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/pom.xml: ## @@ -0,0 +1,211 @@ +http://maven.apache.org/POM/4.0.0; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> + +4.0.0 + + +org.apache.flink +flink-cdc-pipeline-connectors +3.2-SNAPSHOT Review Comment: please use `${revision}` like other connectors. ## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchEventSerializer.java: ## @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.elasticsearch.serializer; + +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.cdc.common.data.RecordData; +import org.apache.flink.cdc.common.event.*; +import org.apache.flink.cdc.common.schema.Column; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.types.*; +import org.apache.flink.cdc.common.utils.Preconditions; +import org.apache.flink.cdc.common.utils.SchemaUtils; +import org.apache.flink.connector.base.sink.writer.ElementConverter; + +import co.elastic.clients.elasticsearch.core.bulk.BulkOperationVariant; +import co.elastic.clients.elasticsearch.core.bulk.DeleteOperation; +import co.elastic.clients.elasticsearch.core.bulk.IndexOperation; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** A serializer for Event to BulkOperationVariant. */ +public class ElasticsearchEventSerializer implements ElementConverter { +private final ObjectMapper objectMapper = new ObjectMapper(); +private final Map schemaMaps = new HashMap<>(); + +/** Format DATE type data. */ +public static final DateTimeFormatter DATE_FORMATTER = +DateTimeFormatter.ofPattern("-MM-dd"); + +/** Format timestamp-related type data. */ +public static final DateTimeFormatter DATE_TIME_FORMATTER = +DateTimeFormatter.ofPattern("-MM-dd HH:mm:ss.SS"); + +/** ZoneId from pipeline config to support timestamp with local time zone. */ +private final ZoneId pipelineZoneId; + +public ElasticsearchEventSerializer(ZoneId zoneId) { +this.pipelineZoneId = zoneId; +} + +@Override +public BulkOperationVariant apply(Event event, SinkWriter.Context context) { +try { +if (event instanceof DataChangeEvent) { +return applyDataChangeEvent((DataChangeEvent) event); +} else if (event instanceof SchemaChangeEvent) { +IndexOperation> indexOperation = +applySchemaChangeEvent((SchemaChangeEvent) event); +if (indexOperation != null) { +return indexOperation; +} +} +} catch (IOException e) { +throw new RuntimeException("Failed to serialize event", e); +} +return null; +} + +private IndexOperation> applySchemaChangeEvent( +SchemaChangeEvent schemaChangeEvent) throws IOException { +TableId tableId = schemaChangeEvent.tableId(); +if (schemaChangeEvent instanceof CreateTableEvent) { +Schema schema = ((CreateTableEvent) schemaChangeEvent).getSchema(); +schemaMaps.put(tableId, schema); +return createSchemaIndexOperation(tableId, schema); +} else if (schemaChangeEvent instanceof AddColumnEvent +|| schemaChangeEvent instanceof DropColumnEvent) { +if
Re: [PR] [FLINK-35868][cdc-connector][mongodb] Bump dependency version to support MongoDB 7.0 [flink-cdc]
leonardBang merged PR #3489: URL: https://github.com/apache/flink-cdc/pull/3489 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35391][cdc-connector][paimon] Bump dependency of Paimon Pipeline connector to 0.8.0 [flink-cdc]
beryllw commented on PR #3335: URL: https://github.com/apache/flink-cdc/pull/3335#issuecomment-2249967829 @leonardBang It seems that the failure of unit ci has nothing to do with Paimon's changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35016] catalog changes for model resource [flink]
twalthr commented on code in PR #25036: URL: https://github.com/apache/flink/pull/25036#discussion_r1690005374 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java: ## @@ -790,4 +792,126 @@ void alterPartitionColumnStatistics( CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException; + +// -- models -- + +/** + * Get names of all tables models under this database. An empty list is returned if none exists. + * + * @return a list of the names of all models in this database + * @throws DatabaseNotExistException if the database does not exist + * @throws CatalogException in case of any runtime exception + */ +default List listModels(String databaseName) +throws DatabaseNotExistException, CatalogException { +throw new UnsupportedOperationException( +String.format("listModel(String) is not implemented for %s.", this.getClass())); Review Comment: Let's return an empty list instead. If possible we should avoid errors to make the system more robust. At least on the read path, not necessarily on the write path. ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java: ## @@ -790,4 +792,126 @@ void alterPartitionColumnStatistics( CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException; + +// -- models -- + +/** + * Get names of all tables models under this database. An empty list is returned if none exists. Review Comment: ```suggestion * Get names of all models under this database. An empty list is returned if none exists. ``` ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogModel.java: ## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.api.Schema; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** Interface for a model in a catalog. */ +@PublicEvolving +public interface CatalogModel { +/** Returns a map of string-based model options. */ +Map getOptions(); + +/** Returns a list of model changes. */ +List getModelChanges(); + +/** + * Get the unresolved input schema of the model. + * + * @return unresolved input schema of the model. + */ +Schema getInputSchema(); + +/** + * Get the unresolved output schema of the model. + * + * @return unresolved output schema of the model. + */ +Schema getOutputSchema(); + +/** + * Get comment of the model. + * + * @return comment of the model. + */ +String getComment(); + +/** + * Get a deep copy of the CatalogModel instance. + * + * @return a copy of the CatalogModel instance + */ +CatalogModel copy(); + +/** + * Copy the input model options into the CatalogModel instance. + * + * @return a copy of the CatalogModel instance with new model options. + */ +CatalogModel copy(Map options); + +/** + * Creates a basic implementation of this interface. + * + * @param inputSchema unresolved input schema + * @param outputSchema unresolved output schema + * @param modelOptions model options + * @param comment optional comment + */ +static CatalogModel of( +Schema inputSchema, +Schema outputSchema, +Map modelOptions, +@Nullable String comment) { +return new DefaultCatalogModel( +inputSchema, outputSchema, modelOptions, new ArrayList<>(), comment); +} + +/** + * Creates a basic implementation of this interface. + * + * @param inputSchema unresolved input schema + * @param outputSchema unresolved output schema + * @param modelChanges
Re: [PR] [FLINK-33874][runtime] Support resource request wait mechanism at DefaultDeclarativeSlotPool side for Default Scheduler [flink]
RocMarshal commented on code in PR #25113: URL: https://github.com/apache/flink/pull/25113#discussion_r1691120384 ## flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java: ## @@ -88,7 +88,22 @@ CompletableFuture registerJobMaster( CompletableFuture declareRequiredResources( JobMasterId jobMasterId, ResourceRequirements resourceRequirements, -@RpcTimeout Time timeout); +@RpcTimeout Duration timeout); + +/** + * @deprecated Use {@link #declareRequiredResources(JobMasterId, ResourceRequirements, + * Duration)}. Declares the absolute resource requirements for a job. + * @param jobMasterId id of the JobMaster + * @param resourceRequirements resource requirements + * @return The confirmation that the requirements have been processed + */ +@Deprecated +default CompletableFuture declareRequiredResources( Review Comment: sounds good~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35847][release] Add release note for version 1.20 [flink]
superdiaodiao commented on code in PR #25091: URL: https://github.com/apache/flink/pull/25091#discussion_r1691112012 ## docs/content.zh/release-notes/flink-1.20.md: ## @@ -0,0 +1,434 @@ +--- +title: "Release Notes - Flink 1.20" +--- + + +# Release notes - Flink 1.20 + +These release notes discuss important aspects, such as configuration, behavior or dependencies, +that changed between Flink 1.19 and Flink 1.20. Please read these notes carefully if you are +planning to upgrade your Flink version to 1.20. + +### Checkpoints + + Unified File Merging Mechanism for Checkpoints + +# [FLINK-32070](https://issues.apache.org/jira/browse/FLINK-32070) + +The unified file merging mechanism for checkpointing is introduced to Flink 1.20 as an MVP ("minimum viable product") +feature, which allows scattered small checkpoint files to be written into larger files, reducing the +number of file creations and file deletions and alleviating the pressure of file system metadata management +raised by the file flooding problem during checkpoints. The mechanism can be enabled by setting +`state.checkpoints.file-merging.enabled` to `true`. For more advanced options and principle behind +this feature, please refer to the document of `Checkpointing`. + + Reorganize State & Checkpointing & Recovery Configuration + +# [FLINK-34255](https://issues.apache.org/jira/browse/FLINK-34255) + +Currently, all the options about state and checkpointing are reorganized and categorized by +prefixes as listed below: + +1. execution.checkpointing: all configurations associated with checkpointing and savepoint. +2. execution.state-recovery: all configurations pertinent to state recovery. +3. state.*: all configurations related to the state accessing. +1. state.backend.*: specific options for individual state backends, such as RocksDB. +2. state.changelog: configurations for the changelog, as outlined in FLIP-158, including the options for the "Durable Short-term Log" (DSTL). +3. state.latency-track: configurations related to the latency tracking of state access. + +At the meantime, all the original options scattered everywhere are annotated as `@Deprecated`. + + Use common thread pools when transferring RocksDB state files + +# [FLINK-35501](https://issues.apache.org/jira/browse/FLINK-35501) + +The semantics of `state.backend.rocksdb.checkpoint.transfer.thread.num` changed slightly: +If negative, the common (TM) IO thread pool is used (see `cluster.io-pool.size`) for up/downloading RocksDB files. + + Expose RocksDB bloom filter metrics + +# [FLINK-34386](https://issues.apache.org/jira/browse/FLINK-34386) + +We expose some RocksDB bloom filter metrics to monitor the effectiveness of bloom filter optimization: + +`BLOOM_FILTER_USEFUL`: times bloom filter has avoided file reads. +`BLOOM_FILTER_FULL_POSITIVE`: times bloom FullFilter has not avoided the reads. +`BLOOM_FILTER_FULL_TRUE_POSITIVE`: times bloom FullFilter has not avoided the reads and data actually exist. + + Manually Compact Small SST Files + +# [FLINK-26050](https://issues.apache.org/jira/browse/FLINK-26050) + +In some cases, the number of files produced by RocksDB state backend grows indefinitely.This might +cause task state info (TDD and checkpoint ACK) to exceed RPC message size and fail recovery/checkpoint +in addition to having lots of small files. + +In Flink 1.20, you can manually merge such files in the background using RocksDB API. + +### Runtime & Coordination + + Support Job Recovery from JobMaster Failures for Batch Jobs + +# [FLINK-33892](https://issues.apache.org/jira/browse/FLINK-33892) + +In 1.20, we introduced a batch job recovery mechanism to enable batch jobs to recover as much progress as possible +after a JobMaster failover, avoiding the need to rerun tasks that have already been finished. + +More information about this feature and how to enable it could be found in: https://nightlies.apache.org/flink/flink-docs-master/docs/ops/batch/recovery_from_job_master_failure/ + + Extend Curator config option for Zookeeper configuration + +# [FLINK-33376](https://issues.apache.org/jira/browse/FLINK-33376) + +Adds support for the following curator parameters: +`high-availability.zookeeper.client.authorization` (corresponding curator parameter: `authorization`), +`high-availability.zookeeper.client.max-close-wait` (corresponding curator parameter: `maxCloseWaitMs`), +`high-availability.zookeeper.client.simulated-session-expiration-percent` (corresponding curator parameter: `simulatedSessionExpirationPercent`). + + More fine-grained timer processing + +# [FLINK-20217](https://issues.apache.org/jira/browse/FLINK-20217) + +Firing timers can now be interrupted to speed up checkpointing. Timers that were interrupted by a checkpoint, +will be fired shortly after checkpoint completes. + +By default, this features is disabled. To enabled it please set
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
RocMarshal commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1691113488 ## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategyTest.java: ## @@ -69,46 +69,29 @@ class LocalInputPreferredSlotSharingStrategyTest { private TestingSchedulingExecutionVertex ev21; private TestingSchedulingExecutionVertex ev22; -private Set slotSharingGroups; - -@BeforeEach -void setUp() { -topology = new TestingSchedulingTopology(); - +private void setupCase() { ev11 = topology.newExecutionVertex(JOB_VERTEX_ID_1, 0); ev12 = topology.newExecutionVertex(JOB_VERTEX_ID_1, 1); ev21 = topology.newExecutionVertex(JOB_VERTEX_ID_2, 0); ev22 = topology.newExecutionVertex(JOB_VERTEX_ID_2, 1); -final SlotSharingGroup slotSharingGroup = new SlotSharingGroup(); -slotSharingGroup.addVertexToGroup(JOB_VERTEX_ID_1); -slotSharingGroup.addVertexToGroup(JOB_VERTEX_ID_2); -slotSharingGroups = Collections.singleton(slotSharingGroup); +slotsharingGroup.addVertexToGroup(JOB_VERTEX_ID_1); +slotsharingGroup.addVertexToGroup(JOB_VERTEX_ID_2); } -@Test -void testCoLocationConstraintIsRespected() { -topology.connect(ev11, ev22); -topology.connect(ev12, ev21); - -final CoLocationGroup coLocationGroup = -new TestingCoLocationGroup(JOB_VERTEX_ID_1, JOB_VERTEX_ID_2); -final Set coLocationGroups = Collections.singleton(coLocationGroup); - -final SlotSharingStrategy strategy = -new LocalInputPreferredSlotSharingStrategy( -topology, slotSharingGroups, coLocationGroups); - -assertThat(strategy.getExecutionSlotSharingGroups()).hasSize(2); - assertThat(strategy.getExecutionSlotSharingGroup(ev11.getId()).getExecutionVertexIds()) -.contains(ev11.getId(), ev21.getId()); - assertThat(strategy.getExecutionSlotSharingGroup(ev12.getId()).getExecutionVertexIds()) -.contains(ev12.getId(), ev22.getId()); +@Override +protected SlotSharingStrategy getSlotSharingStrategy( +SchedulingTopology topology, +Set slotSharingGroups, +Set coLocationGroups) { +return new LocalInputPreferredSlotSharingStrategy( +topology, slotSharingGroups, coLocationGroups); } @Test void testInputLocalityIsRespectedWithRescaleEdge() { +setupCase(); Review Comment: Thank @1996fanrui you very much for the review , I updated related parts based on your comments. PTAL if you had the free time. :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35847][release] Add release note for version 1.20 [flink]
superdiaodiao commented on code in PR #25091: URL: https://github.com/apache/flink/pull/25091#discussion_r1691112012 ## docs/content.zh/release-notes/flink-1.20.md: ## @@ -0,0 +1,434 @@ +--- +title: "Release Notes - Flink 1.20" +--- + + +# Release notes - Flink 1.20 + +These release notes discuss important aspects, such as configuration, behavior or dependencies, +that changed between Flink 1.19 and Flink 1.20. Please read these notes carefully if you are +planning to upgrade your Flink version to 1.20. + +### Checkpoints + + Unified File Merging Mechanism for Checkpoints + +# [FLINK-32070](https://issues.apache.org/jira/browse/FLINK-32070) + +The unified file merging mechanism for checkpointing is introduced to Flink 1.20 as an MVP ("minimum viable product") +feature, which allows scattered small checkpoint files to be written into larger files, reducing the +number of file creations and file deletions and alleviating the pressure of file system metadata management +raised by the file flooding problem during checkpoints. The mechanism can be enabled by setting +`state.checkpoints.file-merging.enabled` to `true`. For more advanced options and principle behind +this feature, please refer to the document of `Checkpointing`. + + Reorganize State & Checkpointing & Recovery Configuration + +# [FLINK-34255](https://issues.apache.org/jira/browse/FLINK-34255) + +Currently, all the options about state and checkpointing are reorganized and categorized by +prefixes as listed below: + +1. execution.checkpointing: all configurations associated with checkpointing and savepoint. +2. execution.state-recovery: all configurations pertinent to state recovery. +3. state.*: all configurations related to the state accessing. +1. state.backend.*: specific options for individual state backends, such as RocksDB. +2. state.changelog: configurations for the changelog, as outlined in FLIP-158, including the options for the "Durable Short-term Log" (DSTL). +3. state.latency-track: configurations related to the latency tracking of state access. + +At the meantime, all the original options scattered everywhere are annotated as `@Deprecated`. + + Use common thread pools when transferring RocksDB state files + +# [FLINK-35501](https://issues.apache.org/jira/browse/FLINK-35501) + +The semantics of `state.backend.rocksdb.checkpoint.transfer.thread.num` changed slightly: +If negative, the common (TM) IO thread pool is used (see `cluster.io-pool.size`) for up/downloading RocksDB files. + + Expose RocksDB bloom filter metrics + +# [FLINK-34386](https://issues.apache.org/jira/browse/FLINK-34386) + +We expose some RocksDB bloom filter metrics to monitor the effectiveness of bloom filter optimization: + +`BLOOM_FILTER_USEFUL`: times bloom filter has avoided file reads. +`BLOOM_FILTER_FULL_POSITIVE`: times bloom FullFilter has not avoided the reads. +`BLOOM_FILTER_FULL_TRUE_POSITIVE`: times bloom FullFilter has not avoided the reads and data actually exist. + + Manually Compact Small SST Files + +# [FLINK-26050](https://issues.apache.org/jira/browse/FLINK-26050) + +In some cases, the number of files produced by RocksDB state backend grows indefinitely.This might +cause task state info (TDD and checkpoint ACK) to exceed RPC message size and fail recovery/checkpoint +in addition to having lots of small files. + +In Flink 1.20, you can manually merge such files in the background using RocksDB API. + +### Runtime & Coordination + + Support Job Recovery from JobMaster Failures for Batch Jobs + +# [FLINK-33892](https://issues.apache.org/jira/browse/FLINK-33892) + +In 1.20, we introduced a batch job recovery mechanism to enable batch jobs to recover as much progress as possible +after a JobMaster failover, avoiding the need to rerun tasks that have already been finished. + +More information about this feature and how to enable it could be found in: https://nightlies.apache.org/flink/flink-docs-master/docs/ops/batch/recovery_from_job_master_failure/ + + Extend Curator config option for Zookeeper configuration + +# [FLINK-33376](https://issues.apache.org/jira/browse/FLINK-33376) + +Adds support for the following curator parameters: +`high-availability.zookeeper.client.authorization` (corresponding curator parameter: `authorization`), +`high-availability.zookeeper.client.max-close-wait` (corresponding curator parameter: `maxCloseWaitMs`), +`high-availability.zookeeper.client.simulated-session-expiration-percent` (corresponding curator parameter: `simulatedSessionExpirationPercent`). + + More fine-grained timer processing + +# [FLINK-20217](https://issues.apache.org/jira/browse/FLINK-20217) + +Firing timers can now be interrupted to speed up checkpointing. Timers that were interrupted by a checkpoint, +will be fired shortly after checkpoint completes. + +By default, this features is disabled. To enabled it please set
Re: [PR] [FLINK-35864][Table SQL / API] Add CONV function [flink]
superdiaodiao commented on code in PR #25109: URL: https://github.com/apache/flink/pull/25109#discussion_r1691055255 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/MathFunctionsITCase.java: ## @@ -141,4 +178,181 @@ Stream getTestSetSpecs() { new BigDecimal("123.45"), DataTypes.DECIMAL(6, 2).notNull())); } + +private Stream convTestCases() { +return Stream.of( +TestSetSpec.forFunction(BuiltInFunctionDefinitions.CONV) +.onFieldsWithData( +null, null, "100", "", "11abc", "\u000B\f 4521 \n\r\t") +.andDataTypes( +DataTypes.INT(), +DataTypes.STRING(), +DataTypes.STRING(), +DataTypes.STRING(), +DataTypes.STRING(), +DataTypes.STRING()) +// null input +.testResult($("f0").conv(2, 2), "CONV(f0, 2, 2)", null, DataTypes.STRING()) +.testResult($("f1").conv(2, 2), "CONV(f1, 2, 2)", null, DataTypes.STRING()) +.testResult( +$("f2").conv($("f0"), 2), +"CONV(f2, f0, 2)", +null, +DataTypes.STRING()) +.testResult( +$("f2").conv(2, $("f0")), +"CONV(f2, 2, f0)", +null, +DataTypes.STRING()) +// empty string +.testResult($("f3").conv(2, 4), "CONV(f3, 2, 4)", null, DataTypes.STRING()) +// invalid fromBase +.testResult($("f2").conv(1, 2), "CONV(f2, 1, 2)", null, DataTypes.STRING()) +.testResult( +$("f2").conv(40, 2), "CONV(f2, 40, 2)", null, DataTypes.STRING()) +// invalid toBase +.testResult( +$("f2").conv(2, -1), "CONV(f2, 2, -1)", null, DataTypes.STRING()) +.testResult( +$("f2").conv(2, -40), "CONV(f2, 2, -40)", null, DataTypes.STRING()) +// invalid num format, ignore suffix +.testResult( +$("f4").conv(10, 16), "CONV(f4, 10, 16)", "B", DataTypes.STRING()) +// num overflow +.testTableApiRuntimeError( +lit("").conv(16, 16), +NumberFormatException.class, +"The number overflows.") +.testSqlRuntimeError( +"CONV('', 16, 16)", +NumberFormatException.class, +"The number overflows.") +.testTableApiRuntimeError( +lit("FFFEE").conv(16, 16), +NumberFormatException.class, +"The number FFFEE overflows.") Review Comment: Make sense! Thanks~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix][test] Add CDC 3.1.1 release to migration test versions [flink-cdc]
leonardBang merged PR #3426: URL: https://github.com/apache/flink-cdc/pull/3426 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33874][runtime] Support resource request wait mechanism at DefaultDeclarativeSlotPool side for Default Scheduler [flink]
1996fanrui commented on code in PR #25113: URL: https://github.com/apache/flink/pull/25113#discussion_r1690930831 ## flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java: ## @@ -88,7 +88,22 @@ CompletableFuture registerJobMaster( CompletableFuture declareRequiredResources( JobMasterId jobMasterId, ResourceRequirements resourceRequirements, -@RpcTimeout Time timeout); +@RpcTimeout Duration timeout); + +/** + * @deprecated Use {@link #declareRequiredResources(JobMasterId, ResourceRequirements, + * Duration)}. Declares the absolute resource requirements for a job. + * @param jobMasterId id of the JobMaster + * @param resourceRequirements resource requirements + * @return The confirmation that the requirements have been processed + */ +@Deprecated +default CompletableFuture declareRequiredResources( Review Comment: We could refactor all callers from Time to Duration. And the old method isn't needed anymore. Because it's not public api, we don't need to mark it to `@Deprecated`. Be careful which commit this change belongs to. ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java: ## @@ -75,21 +76,30 @@ public class DeclarativeSlotPoolService implements SlotPoolService { @Nullable private String jobManagerAddress; private State state = State.CREATED; +protected ComponentMainThreadExecutor componentMainThreadExecutor; Review Comment: ```suggestion protected final ComponentMainThreadExecutor componentMainThreadExecutor; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33874][runtime] Support resource request wait mechanism at DefaultDeclarativeSlotPool side for Default Scheduler [flink]
1996fanrui commented on code in PR #25113: URL: https://github.com/apache/flink/pull/25113#discussion_r1690930831 ## flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java: ## @@ -88,7 +88,22 @@ CompletableFuture registerJobMaster( CompletableFuture declareRequiredResources( JobMasterId jobMasterId, ResourceRequirements resourceRequirements, -@RpcTimeout Time timeout); +@RpcTimeout Duration timeout); + +/** + * @deprecated Use {@link #declareRequiredResources(JobMasterId, ResourceRequirements, + * Duration)}. Declares the absolute resource requirements for a job. + * @param jobMasterId id of the JobMaster + * @param resourceRequirements resource requirements + * @return The confirmation that the requirements have been processed + */ +@Deprecated +default CompletableFuture declareRequiredResources( Review Comment: We could refactor all callers from Time to Duration (Only 3 callers). And the old method isn't needed anymore. Because it's not public api, we don't need to mark it to `@Deprecated`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33874][runtime] Support resource request wait mechanism at DefaultDeclarativeSlotPool side for Default Scheduler [flink]
RocMarshal commented on code in PR #25113: URL: https://github.com/apache/flink/pull/25113#discussion_r1690890786 ## flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPoolTest.java: ## @@ -154,7 +163,7 @@ private void testRegisterSlots(boolean isBlocked) { BlocklistDeclarativeSlotPoolBuilder.builder() .setBlockedTaskManagerChecker( isBlocked ? taskManager.getResourceID()::equals : ignore -> false) -.build(); +.build(Duration.ZERO, componentMainThreadExecutor); Review Comment: thx for the comment. IIUC, the test case is nothing about slot request max interval. Please correct me if wrong. thx a lot -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.20][FLINK-35095][test] Fix unstable tests in `ExecutionEnvironmentImplTest` [flink]
reswqa merged PR #25111: URL: https://github.com/apache/flink/pull/25111 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.20][FLINK-35095][test] Fix unstable tests in `ExecutionEnvironmentImplTest` [flink]
reswqa commented on PR #25111: URL: https://github.com/apache/flink/pull/25111#issuecomment-2249702180 Already reviewed in https://github.com/apache/flink/pull/25085. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35864][Table SQL / API] Add CONV function [flink]
dylanhz commented on code in PR #25109: URL: https://github.com/apache/flink/pull/25109#discussion_r1691012678 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/MathFunctionsITCase.java: ## @@ -141,4 +178,181 @@ Stream getTestSetSpecs() { new BigDecimal("123.45"), DataTypes.DECIMAL(6, 2).notNull())); } + +private Stream convTestCases() { +return Stream.of( +TestSetSpec.forFunction(BuiltInFunctionDefinitions.CONV) +.onFieldsWithData( +null, null, "100", "", "11abc", "\u000B\f 4521 \n\r\t") +.andDataTypes( +DataTypes.INT(), +DataTypes.STRING(), +DataTypes.STRING(), +DataTypes.STRING(), +DataTypes.STRING(), +DataTypes.STRING()) +// null input +.testResult($("f0").conv(2, 2), "CONV(f0, 2, 2)", null, DataTypes.STRING()) +.testResult($("f1").conv(2, 2), "CONV(f1, 2, 2)", null, DataTypes.STRING()) +.testResult( +$("f2").conv($("f0"), 2), +"CONV(f2, f0, 2)", +null, +DataTypes.STRING()) +.testResult( +$("f2").conv(2, $("f0")), +"CONV(f2, 2, f0)", +null, +DataTypes.STRING()) +// empty string +.testResult($("f3").conv(2, 4), "CONV(f3, 2, 4)", null, DataTypes.STRING()) +// invalid fromBase +.testResult($("f2").conv(1, 2), "CONV(f2, 1, 2)", null, DataTypes.STRING()) +.testResult( +$("f2").conv(40, 2), "CONV(f2, 40, 2)", null, DataTypes.STRING()) +// invalid toBase +.testResult( +$("f2").conv(2, -1), "CONV(f2, 2, -1)", null, DataTypes.STRING()) +.testResult( +$("f2").conv(2, -40), "CONV(f2, 2, -40)", null, DataTypes.STRING()) +// invalid num format, ignore suffix +.testResult( +$("f4").conv(10, 16), "CONV(f4, 10, 16)", "B", DataTypes.STRING()) +// num overflow +.testTableApiRuntimeError( +lit("").conv(16, 16), +NumberFormatException.class, +"The number overflows.") +.testSqlRuntimeError( +"CONV('', 16, 16)", +NumberFormatException.class, +"The number overflows.") +.testTableApiRuntimeError( +lit("FFFEE").conv(16, 16), +NumberFormatException.class, +"The number FFFEE overflows.") Review Comment: There are three distinct numbers involved in overflow test cases: - ``, which equates to -1, will not trigger overflow in encode(). However, since -1 is inherently treated as an overflow flag, it ends up throwing an exception. - `FFFEE` will trigger overflow in encode() line 97. - `18446744073709551616` will trigger overflow in encode() line 105. So these numbers are actually very special boundary cases and I think they should be kept. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]
mateczagany commented on code in PR #821: URL: https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1690999244 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtils.java: ## @@ -0,0 +1,395 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.utils; + +import org.apache.flink.autoscaler.utils.DateTimeUtils; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; +import org.apache.flink.kubernetes.operator.api.CrdConstants; +import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot; +import org.apache.flink.kubernetes.operator.api.spec.CheckpointSpec; +import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference; +import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotSpec; +import org.apache.flink.kubernetes.operator.api.spec.JobReference; +import org.apache.flink.kubernetes.operator.api.spec.SavepointSpec; +import org.apache.flink.kubernetes.operator.api.status.CheckpointType; +import org.apache.flink.kubernetes.operator.api.status.SavepointFormatType; +import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType; +import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration; +import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions; +import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; +import org.apache.flink.kubernetes.operator.reconciler.SnapshotType; + +import io.fabric8.kubernetes.api.model.ObjectMeta; +import io.fabric8.kubernetes.client.KubernetesClient; +import org.apache.commons.lang3.StringUtils; + +import javax.annotation.Nullable; + +import java.time.Instant; +import java.util.UUID; + +import static org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.ABANDONED; +import static org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.COMPLETED; +import static org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.IN_PROGRESS; +import static org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.TRIGGER_PENDING; +import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.SNAPSHOT_RESOURCE_ENABLED; +import static org.apache.flink.kubernetes.operator.reconciler.SnapshotType.CHECKPOINT; +import static org.apache.flink.kubernetes.operator.reconciler.SnapshotType.SAVEPOINT; + +/** Utilities class for FlinkStateSnapshot resources. */ +public class FlinkStateSnapshotUtils { + +/** + * From a snapshot reference, return its snapshot path. If a {@link FlinkStateSnapshot} is + * referenced, it will be retrieved from Kubernetes. + * + * @param kubernetesClient kubernetes client + * @param snapshotRef snapshot reference + * @return found savepoint path + */ +public static String getAndValidateFlinkStateSnapshotPath( +KubernetesClient kubernetesClient, FlinkStateSnapshotReference snapshotRef) { +if (!StringUtils.isBlank(snapshotRef.getPath())) { +return snapshotRef.getPath(); +} + +if (StringUtils.isBlank(snapshotRef.getName())) { +throw new IllegalArgumentException( +String.format("Invalid snapshot name: %s", snapshotRef.getName())); +} + +FlinkStateSnapshot result; +if (snapshotRef.getName() != null) { +var namespace = snapshotRef.getNamespace(); +if (namespace == null) { +result = +kubernetesClient +.resources(FlinkStateSnapshot.class) +.withName(snapshotRef.getName()) +.get(); +} else { +result = +kubernetesClient +.resources(FlinkStateSnapshot.class) +.inNamespace(namespace) +.withName(snapshotRef.getName()) +
Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]
mateczagany commented on code in PR #821: URL: https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1690995181 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java: ## @@ -72,21 +74,26 @@ public class FlinkConfigManager { private volatile Configuration defaultConfig; private volatile FlinkOperatorConfiguration defaultOperatorConfiguration; +private final boolean snapshotCrdInstalled; Review Comment: Added the note, thank you. I am going to also note here so others can see: This field will be determined in `FlinkOperator` when the operator starts, by checking if `FlinkStateSnapshot` CRD is installed on the Kubernetes cluster. When generating the default configuration for the operator, `kubernetes.operator.snapshot.resource.enabled` will be set to false in this class if `snapshotCrdInstalled` is false, to allow users to continue using the operator if they upgraded from a previous version but did not install the new `FlinkStateSnapshot` CRD. This check and field is planned to be removed in the release after this FLIP gets released (so hopefully 1.11 if this PR is released in 1.10). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35864][Table SQL / API] Add CONV function [flink]
superdiaodiao commented on code in PR #25109: URL: https://github.com/apache/flink/pull/25109#discussion_r1690918688 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/MathFunctionsITCase.java: ## @@ -141,4 +178,181 @@ Stream getTestSetSpecs() { new BigDecimal("123.45"), DataTypes.DECIMAL(6, 2).notNull())); } + +private Stream convTestCases() { +return Stream.of( +TestSetSpec.forFunction(BuiltInFunctionDefinitions.CONV) +.onFieldsWithData( +null, null, "100", "", "11abc", "\u000B\f 4521 \n\r\t") +.andDataTypes( +DataTypes.INT(), +DataTypes.STRING(), +DataTypes.STRING(), +DataTypes.STRING(), +DataTypes.STRING(), +DataTypes.STRING()) +// null input +.testResult($("f0").conv(2, 2), "CONV(f0, 2, 2)", null, DataTypes.STRING()) +.testResult($("f1").conv(2, 2), "CONV(f1, 2, 2)", null, DataTypes.STRING()) +.testResult( +$("f2").conv($("f0"), 2), +"CONV(f2, f0, 2)", +null, +DataTypes.STRING()) +.testResult( +$("f2").conv(2, $("f0")), +"CONV(f2, 2, f0)", +null, +DataTypes.STRING()) +// empty string +.testResult($("f3").conv(2, 4), "CONV(f3, 2, 4)", null, DataTypes.STRING()) +// invalid fromBase +.testResult($("f2").conv(1, 2), "CONV(f2, 1, 2)", null, DataTypes.STRING()) +.testResult( +$("f2").conv(40, 2), "CONV(f2, 40, 2)", null, DataTypes.STRING()) +// invalid toBase +.testResult( +$("f2").conv(2, -1), "CONV(f2, 2, -1)", null, DataTypes.STRING()) +.testResult( +$("f2").conv(2, -40), "CONV(f2, 2, -40)", null, DataTypes.STRING()) +// invalid num format, ignore suffix +.testResult( +$("f4").conv(10, 16), "CONV(f4, 10, 16)", "B", DataTypes.STRING()) +// num overflow +.testTableApiRuntimeError( +lit("").conv(16, 16), +NumberFormatException.class, +"The number overflows.") +.testSqlRuntimeError( +"CONV('', 16, 16)", +NumberFormatException.class, +"The number overflows.") +.testTableApiRuntimeError( +lit("FFFEE").conv(16, 16), +NumberFormatException.class, +"The number FFFEE overflows.") Review Comment: @snuyanzin These two test cases(` `vs `FFFEE`) are almost the same, do you think we should only keep one of them? Like the comment from another PR: https://github.com/apache/flink/pull/24773#discussion_r1603091969 cc @lsyldliu -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35864][Table SQL / API] Add CONV function [flink]
superdiaodiao commented on code in PR #25109: URL: https://github.com/apache/flink/pull/25109#discussion_r1690918688 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/MathFunctionsITCase.java: ## @@ -141,4 +178,181 @@ Stream getTestSetSpecs() { new BigDecimal("123.45"), DataTypes.DECIMAL(6, 2).notNull())); } + +private Stream convTestCases() { +return Stream.of( +TestSetSpec.forFunction(BuiltInFunctionDefinitions.CONV) +.onFieldsWithData( +null, null, "100", "", "11abc", "\u000B\f 4521 \n\r\t") +.andDataTypes( +DataTypes.INT(), +DataTypes.STRING(), +DataTypes.STRING(), +DataTypes.STRING(), +DataTypes.STRING(), +DataTypes.STRING()) +// null input +.testResult($("f0").conv(2, 2), "CONV(f0, 2, 2)", null, DataTypes.STRING()) +.testResult($("f1").conv(2, 2), "CONV(f1, 2, 2)", null, DataTypes.STRING()) +.testResult( +$("f2").conv($("f0"), 2), +"CONV(f2, f0, 2)", +null, +DataTypes.STRING()) +.testResult( +$("f2").conv(2, $("f0")), +"CONV(f2, 2, f0)", +null, +DataTypes.STRING()) +// empty string +.testResult($("f3").conv(2, 4), "CONV(f3, 2, 4)", null, DataTypes.STRING()) +// invalid fromBase +.testResult($("f2").conv(1, 2), "CONV(f2, 1, 2)", null, DataTypes.STRING()) +.testResult( +$("f2").conv(40, 2), "CONV(f2, 40, 2)", null, DataTypes.STRING()) +// invalid toBase +.testResult( +$("f2").conv(2, -1), "CONV(f2, 2, -1)", null, DataTypes.STRING()) +.testResult( +$("f2").conv(2, -40), "CONV(f2, 2, -40)", null, DataTypes.STRING()) +// invalid num format, ignore suffix +.testResult( +$("f4").conv(10, 16), "CONV(f4, 10, 16)", "B", DataTypes.STRING()) +// num overflow +.testTableApiRuntimeError( +lit("").conv(16, 16), +NumberFormatException.class, +"The number overflows.") +.testSqlRuntimeError( +"CONV('', 16, 16)", +NumberFormatException.class, +"The number overflows.") +.testTableApiRuntimeError( +lit("FFFEE").conv(16, 16), +NumberFormatException.class, +"The number FFFEE overflows.") Review Comment: @snuyanzin These two test cases(` `vs `FFFEE`) are almost the same, may I ask should we only keep one of them? Like the comment from another PR: https://github.com/apache/flink/pull/24773#discussion_r1603091969 cc @lsyldliu -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35864][Table SQL / API] Add CONV function [flink]
superdiaodiao commented on code in PR #25109: URL: https://github.com/apache/flink/pull/25109#discussion_r1690918688 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/MathFunctionsITCase.java: ## @@ -141,4 +178,181 @@ Stream getTestSetSpecs() { new BigDecimal("123.45"), DataTypes.DECIMAL(6, 2).notNull())); } + +private Stream convTestCases() { +return Stream.of( +TestSetSpec.forFunction(BuiltInFunctionDefinitions.CONV) +.onFieldsWithData( +null, null, "100", "", "11abc", "\u000B\f 4521 \n\r\t") +.andDataTypes( +DataTypes.INT(), +DataTypes.STRING(), +DataTypes.STRING(), +DataTypes.STRING(), +DataTypes.STRING(), +DataTypes.STRING()) +// null input +.testResult($("f0").conv(2, 2), "CONV(f0, 2, 2)", null, DataTypes.STRING()) +.testResult($("f1").conv(2, 2), "CONV(f1, 2, 2)", null, DataTypes.STRING()) +.testResult( +$("f2").conv($("f0"), 2), +"CONV(f2, f0, 2)", +null, +DataTypes.STRING()) +.testResult( +$("f2").conv(2, $("f0")), +"CONV(f2, 2, f0)", +null, +DataTypes.STRING()) +// empty string +.testResult($("f3").conv(2, 4), "CONV(f3, 2, 4)", null, DataTypes.STRING()) +// invalid fromBase +.testResult($("f2").conv(1, 2), "CONV(f2, 1, 2)", null, DataTypes.STRING()) +.testResult( +$("f2").conv(40, 2), "CONV(f2, 40, 2)", null, DataTypes.STRING()) +// invalid toBase +.testResult( +$("f2").conv(2, -1), "CONV(f2, 2, -1)", null, DataTypes.STRING()) +.testResult( +$("f2").conv(2, -40), "CONV(f2, 2, -40)", null, DataTypes.STRING()) +// invalid num format, ignore suffix +.testResult( +$("f4").conv(10, 16), "CONV(f4, 10, 16)", "B", DataTypes.STRING()) +// num overflow +.testTableApiRuntimeError( +lit("").conv(16, 16), +NumberFormatException.class, +"The number overflows.") +.testSqlRuntimeError( +"CONV('', 16, 16)", +NumberFormatException.class, +"The number overflows.") +.testTableApiRuntimeError( +lit("FFFEE").conv(16, 16), +NumberFormatException.class, +"The number FFFEE overflows.") Review Comment: @snuyanzin These two test cases(` `vs `FFFEE`) are almost the same, do you think we should only keep one of them? Like the comment from another PR: https://github.com/apache/flink/pull/24773#discussion_r1603091969 cc @lsyldliu -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33874][runtime] Support resource request wait mechanism at DefaultDeclarativeSlotPool side for Default Scheduler [flink]
RocMarshal commented on code in PR #25113: URL: https://github.com/apache/flink/pull/25113#discussion_r1689451505 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java: ## @@ -81,18 +84,26 @@ public DeclarativeSlotPoolService( DeclarativeSlotPoolFactory declarativeSlotPoolFactory, Clock clock, Time idleSlotTimeout, -Time rpcTimeout) { +Time rpcTimeout, +Duration slotRequestMaxInterval, +@Nonnull ComponentMainThreadExecutor componentMainThreadExecutor) { this.jobId = jobId; this.clock = clock; this.rpcTimeout = rpcTimeout; this.registeredTaskManagers = new HashSet<>(); this.declarativeSlotPool = declarativeSlotPoolFactory.create( -jobId, this::declareResourceRequirements, idleSlotTimeout, rpcTimeout); +jobId, +this::declareResourceRequirements, +idleSlotTimeout, +rpcTimeout, +slotRequestMaxInterval, +componentMainThreadExecutor); } -protected DeclarativeSlotPool getDeclarativeSlotPool() { +@VisibleForTesting +public DeclarativeSlotPool getDeclarativeSlotPool() { Review Comment: removed ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java: ## @@ -81,18 +84,26 @@ public DeclarativeSlotPoolService( DeclarativeSlotPoolFactory declarativeSlotPoolFactory, Clock clock, Time idleSlotTimeout, -Time rpcTimeout) { +Time rpcTimeout, +Duration slotRequestMaxInterval, +@Nonnull ComponentMainThreadExecutor componentMainThreadExecutor) { Review Comment: updated ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java: ## @@ -599,4 +632,27 @@ private ResourceCounter getFulfilledRequirements( ResourceCounter getFulfilledResourceRequirements() { return fulfilledResourceRequirements; } + +@VisibleForTesting +@Nonnull +public ComponentMainThreadExecutor getComponentMainThreadExecutor() { +return componentMainThreadExecutor; +} + +@VisibleForTesting +@Nonnull +public Duration getSlotRequestMaxInterval() { +return slotRequestMaxInterval; +} Review Comment: deleted. ## flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPoolTest.java: ## @@ -299,14 +310,22 @@ public BlocklistDeclarativeSlotPoolBuilder setBlockedTaskManagerChecker( return this; } -public BlocklistDeclarativeSlotPool build() { +public BlocklistDeclarativeSlotPool build( +Duration slotRequestMaxInterval, +ComponentMainThreadExecutor componentMainThreadExecutor) { return new BlocklistDeclarativeSlotPool( new JobID(), new DefaultAllocatedSlotPool(), ignored -> {}, blockedTaskManagerChecker, Time.seconds(20), -Time.seconds(20)); +Time.seconds(20), +slotRequestMaxInterval, +componentMainThreadExecutor); +} + +public BlocklistDeclarativeSlotPool build() { +return build(Duration.ZERO, forMainThread()); Review Comment: updated. ## flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTestBase.java: ## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; +import
Re: [PR] [FLINK-35888][cdc-connector][paimon] Add e2e test for PaimonDataSink. [flink-cdc]
lvyanquan commented on code in PR #3491: URL: https://github.com/apache/flink-cdc/pull/3491#discussion_r1689470540 ## flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToPaimonE2eITCase.java: ## @@ -0,0 +1,371 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.pipeline.tests; + +import org.apache.flink.cdc.common.test.utils.TestUtils; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion; +import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase; +import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; + +import java.io.IOException; +import java.nio.file.Path; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Stream; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** End-to-end tests for mysql cdc to Paimon pipeline job. */ +@RunWith(Parameterized.class) +public class MySqlToPaimonE2eITCase extends PipelineTestEnvironment { +private static final Logger LOG = LoggerFactory.getLogger(MySqlToPaimonE2eITCase.class); + +public static final int TESTCASE_TIMEOUT_SECONDS = 60; + +private TableEnvironment tEnv; + +// -- +// MySQL Variables (we always use MySQL as the data source for easier verifying) +// -- +protected static final String MYSQL_TEST_USER = "mysqluser"; +protected static final String MYSQL_TEST_PASSWORD = "mysqlpw"; + +@ClassRule +public static final MySqlContainer MYSQL = +(MySqlContainer) +new MySqlContainer( +MySqlVersion.V8_0) // v8 support both ARM and AMD architectures +.withConfigurationOverride("docker/mysql/my.cnf") +.withSetupSQL("docker/mysql/setup.sql") +.withDatabaseName("flink-test") +.withUsername("flinkuser") +.withPassword("flinkpw") +.withNetwork(NETWORK) +.withNetworkAliases("mysql") +.withLogConsumer(new Slf4jLogConsumer(LOG)); + +protected final UniqueDatabase mysqlInventoryDatabase = +new UniqueDatabase(MYSQL, "mysql_inventory", MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); + +public MySqlToPaimonE2eITCase() throws IOException {} + +@BeforeClass +public static void initializeContainers() { +LOG.info("Starting containers..."); +Startables.deepStart(Stream.of(MYSQL)).join(); +LOG.info("Containers are started."); +} + +@Before +public void before() throws Exception { +super.before(); +mysqlInventoryDatabase.createAndInitialize(); +tEnv = TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build()); +} + +@After +public void after() { +super.after(); +mysqlInventoryDatabase.dropDatabase(); +} + +@Test +public void testSyncWholeDatabase() throws Exception { +
Re: [PR] [FLINK-35167][cdc-connector] Introduce MaxCompute pipeline DataSink [flink-cdc]
dingxin-tech commented on PR #3254: URL: https://github.com/apache/flink-cdc/pull/3254#issuecomment-2249497797 > Thanks @dingxin-tech for the nice work, the CI wasn't triggered properly, we need adjust the CI setting[1] when new connector or new module joining. > > [1]https://github.com/apache/flink-cdc/blob/master/.github/workflows/flink_cdc.yml Hi, I added tests for MaxCompute in the CI file. Additionally, I refactored the code to apply the newly released DataSink feature, which allows specifying a HashFunction, and upgraded the ODPS SDK. Could you please review this pr again? @leonardBang @lvyanquan -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33874][runtime] Support resource request wait mechanism at DefaultDeclarativeSlotPool side for Default Scheduler [flink]
1996fanrui commented on code in PR #25113: URL: https://github.com/apache/flink/pull/25113#discussion_r1690806756 ## flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AbstractDeclarativeSlotPoolBridgeTest.java: ## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.jobmaster.JobMasterId; +import org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway; +import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder; +import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; +import org.apache.flink.util.clock.SystemClock; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; + +import static org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter.forMainThread; + +/** Test base class for {@link DeclarativeSlotPoolBridge}. */ +abstract class AbstractDeclarativeSlotPoolBridgeTest { + +protected static final Duration rpcTimeout = Duration.ofSeconds(20); +protected static final JobID jobId = new JobID(); +protected static final JobMasterId jobMasterId = JobMasterId.generate(); Review Comment: CI is failed, these name should be upper case. https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=61067=logs=52b61abe-a3cc-5bde-cc35-1bbe89bb7df5=54421a62-0c80-5aad-3319-094ff69180bb=6089 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33874][runtime] Support resource request wait mechanism at DefaultDeclarativeSlotPool side for Default Scheduler [flink]
1996fanrui commented on code in PR #25113: URL: https://github.com/apache/flink/pull/25113#discussion_r1690742563 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java: ## @@ -292,7 +299,7 @@ public void connectToResourceManager(ResourceManagerGateway resourceManagerGatew resourceRequirementServiceConnectionManager.connect( resourceRequirements -> resourceManagerGateway.declareRequiredResources( -jobMasterId, resourceRequirements, rpcTimeout)); +jobMasterId, resourceRequirements, Time.fromDuration(rpcTimeout))); Review Comment: It's better to update the parameter type from `Time` to `Duration` for `declareRequiredResources`. If so, we could avoid unnecessary `type conversion` ## flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeBuilder.java: ## @@ -80,22 +88,25 @@ public DeclarativeSlotPoolBridgeBuilder setRequestSlotMatchingStrategy( return this; } -public DeclarativeSlotPoolBridge build() { +public DeclarativeSlotPoolBridge build( +ComponentMainThreadExecutor componentMainThreadExecutor) { Review Comment: In general, build method doesn't with any parameter, and all parameters are set by setter. ## flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPoolTest.java: ## @@ -299,14 +308,18 @@ public BlocklistDeclarativeSlotPoolBuilder setBlockedTaskManagerChecker( return this; } -public BlocklistDeclarativeSlotPool build() { +public BlocklistDeclarativeSlotPool build( +Duration slotRequestMaxInterval, +ComponentMainThreadExecutor componentMainThreadExecutor) { Review Comment: In general, the build method without any parameter, and all parameters are set by setter. ## flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AbstractDeclarativeSlotPoolBridgeTest.java: ## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.jobmaster.JobMasterId; +import org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway; +import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder; +import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; +import org.apache.flink.util.clock.SystemClock; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; + +import static org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter.forMainThread; + +/** Test base class for {@link DeclarativeSlotPoolBridge}. */ +abstract class AbstractDeclarativeSlotPoolBridgeTest { + +protected static final Duration rpcTimeout = Duration.ofSeconds(20); +protected static final JobID jobId = new JobID(); +protected static final JobMasterId jobMasterId = JobMasterId.generate(); +protected static final ComponentMainThreadExecutor mainThreadExecutor = forMainThread(); Review Comment: The `mainThreadExecutor` shouldn't be static, right? It will use the current thread as the executor. ## flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java: ## @@ -58,45 +61,66 @@ import static org.assertj.core.api.Assertions.assertThat; /** Tests for the {@link DefaultDeclarativeSlotPool}. */ -class DefaultDeclarativeSlotPoolTest { +@ExtendWith(ParameterizedTestExtension.class) +class
Re: [PR] [FLINK-35822] Introduce DESCRIBE FUNCTION [flink]
pgaref commented on code in PR #25115: URL: https://github.com/apache/flink/pull/25115#discussion_r1690799595 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DescribeFunctionOperation.java: ## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.api.internal.TableResultInternal; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.ContextResolvedFunction; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.UnresolvedIdentifier; +import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.types.DataType; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.apache.flink.table.api.internal.TableResultUtils.buildTableResult; +import static org.apache.flink.table.types.inference.TypeInferenceUtil.generateSignature; + +/** + * Operation to describe a DESCRIBE [EXTENDED] [[catalogName.] dataBasesName].sqlIdentifier + * statement. + */ +@Internal +public class DescribeFunctionOperation implements Operation, ExecutableOperation { + +private final ObjectIdentifier sqlIdentifier; +private final boolean isExtended; + +public DescribeFunctionOperation(ObjectIdentifier sqlIdentifier, boolean isExtended) { +this.sqlIdentifier = sqlIdentifier; +this.isExtended = isExtended; +} + +public ObjectIdentifier getSqlIdentifier() { +return sqlIdentifier; +} + +public boolean isExtended() { +return isExtended; +} + +@Override +public String asSummaryString() { +Map params = new LinkedHashMap<>(); +params.put("identifier", sqlIdentifier); +params.put("isExtended", isExtended); +return OperationUtils.formatWithChildren( +"DESCRIBE FUNCTION", params, Collections.emptyList(), Operation::asSummaryString); +} + +@Override +public TableResultInternal execute(Context ctx) { +// DESCRIBE FUNCTION shows all the function properties. +Optional functionOpt = + ctx.getFunctionCatalog().lookupFunction(UnresolvedIdentifier.of(sqlIdentifier)); +if (!functionOpt.isPresent()) { +throw new ValidationException( +String.format( +"Function with the identifier '%s' doesn't exist.", +sqlIdentifier.asSummaryString())); +} +ContextResolvedFunction function = functionOpt.get(); +CatalogFunction catalogFunction = function.getCatalogFunction(); + +if (catalogFunction == null && !isExtended) { +throw new ValidationException( +String.format( +"Function with the identifier '%s' is a system function which can only be described by using the extended keyword. Use 'DESCRIBE FUNCTION EXTENDED %s' to see its properties.", +sqlIdentifier.asSummaryString(), sqlIdentifier.asSummaryString())); +} + +List> rows = new ArrayList<>(); +if (catalogFunction != null) { +rows.add(Arrays.asList("class name", catalogFunction.getClassName())); +rows.add( +Arrays.asList( +"function language", catalogFunction.getFunctionLanguage().toString())); +rows.add( +Arrays.asList( +"resource uris", catalogFunction.getFunctionResources().toString())); +rows.add(Arrays.asList("temporary", function.isTemporary())); +} + +if (isExtended) { +FunctionDefinition definition = function.getDefinition(); +rows.add(Arrays.asList("kind", definition.getKind().toString())); +
Re: [PR] [FLINK-35822] Introduce DESCRIBE FUNCTION [flink]
pgaref commented on code in PR #25115: URL: https://github.com/apache/flink/pull/25115#discussion_r1690790290 ## flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlRichDescribeFunction.java: ## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.dql; + +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlSpecialOperator; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; + +import java.util.Collections; +import java.util.List; + +/** + * DESCRIBE FUNCTION [EXTENDED] [[catalogName.] dataBasesName].sqlIdentifier sql call. Here we add + * Rich in className to follow the convention of {@link org.apache.calcite.sql.SqlDescribeTable}, + * which only had it to distinguish from calcite's original SqlDescribeTable, even though calcite + * does not have SqlDescribeFunction. + */ +public class SqlRichDescribeFunction extends SqlCall { + +public static final SqlSpecialOperator OPERATOR = +new SqlSpecialOperator("DESCRIBE FUNCTION", SqlKind.OTHER); +protected final SqlIdentifier functionNameIdentifier; +private boolean isExtended; + +public SqlRichDescribeFunction( +SqlParserPos pos, SqlIdentifier functionNameIdentifier, boolean isExtended) { +super(pos); +this.functionNameIdentifier = functionNameIdentifier; +this.isExtended = isExtended; +} + +@Override +public SqlOperator getOperator() { +return OPERATOR; +} + +@Override +public List getOperandList() { +return Collections.singletonList(functionNameIdentifier); +} + +public boolean isExtended() { +return isExtended; +} + +public String[] fullFunctionName() { +return functionNameIdentifier.names.toArray(new String[0]); Review Comment: ```suggestion return functionNameIdentifier.names.toArray(new String[functionNameIdentifier.names.size()]); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35822] Introduce DESCRIBE FUNCTION [flink]
pgaref commented on code in PR #25115: URL: https://github.com/apache/flink/pull/25115#discussion_r1690790290 ## flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlRichDescribeFunction.java: ## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.dql; + +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlSpecialOperator; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; + +import java.util.Collections; +import java.util.List; + +/** + * DESCRIBE FUNCTION [EXTENDED] [[catalogName.] dataBasesName].sqlIdentifier sql call. Here we add + * Rich in className to follow the convention of {@link org.apache.calcite.sql.SqlDescribeTable}, + * which only had it to distinguish from calcite's original SqlDescribeTable, even though calcite + * does not have SqlDescribeFunction. + */ +public class SqlRichDescribeFunction extends SqlCall { + +public static final SqlSpecialOperator OPERATOR = +new SqlSpecialOperator("DESCRIBE FUNCTION", SqlKind.OTHER); +protected final SqlIdentifier functionNameIdentifier; +private boolean isExtended; + +public SqlRichDescribeFunction( +SqlParserPos pos, SqlIdentifier functionNameIdentifier, boolean isExtended) { +super(pos); +this.functionNameIdentifier = functionNameIdentifier; +this.isExtended = isExtended; +} + +@Override +public SqlOperator getOperator() { +return OPERATOR; +} + +@Override +public List getOperandList() { +return Collections.singletonList(functionNameIdentifier); +} + +public boolean isExtended() { +return isExtended; +} + +public String[] fullFunctionName() { +return functionNameIdentifier.names.toArray(new String[0]); Review Comment: ```suggestion return functionNameIdentifier.names.toArray(new String[functionNameIdentifier.names.size]); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35822] Introduce DESCRIBE FUNCTION [flink]
pgaref commented on code in PR #25115: URL: https://github.com/apache/flink/pull/25115#discussion_r1690785755 ## flink-table/flink-sql-client/src/test/resources/sql/function.q: ## @@ -1,4 +1,4 @@ -# function.q - CREATE/DROP/ALTER FUNCTION +# function.q - CREATE/DROP/ALTER/SHOW/DESCRIBE FUNCTION Review Comment: ```suggestion # function.q - CREATE/DROP/ALTER/DESCRIBE FUNCTION ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35822] Introduce DESCRIBE FUNCTION [flink]
pgaref commented on code in PR #25115: URL: https://github.com/apache/flink/pull/25115#discussion_r1690786505 ## flink-table/flink-sql-client/src/test/resources/sql/function.q: ## @@ -265,7 +265,7 @@ show user functions; # test alter function # == -alter function func11 as 'org.apache.flink.table.client.gateway.local.ExecutorImplITCase$TestScalaFunction'; +alter function func11 as 'org.apache.flink.table.client.gateway.ExecutorImplITCase$TestScalaFunction'; Review Comment: Unrelated change? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [cdc-common] add field of defaultValue to Column. [flink-cdc]
yuxiqian commented on PR #2944: URL: https://github.com/apache/flink-cdc/pull/2944#issuecomment-2249296709 Thanks for @lvyanquan's rapid response! Ran migration tests and it works as expected. * `TableChangeInfo#Serializer#magicTableId` should be named in screaming case `MAGIC_TABLE_ID` as a `static final` constant, or Spotless will complain. * `MySqlPipelineITCase` needs update to reflect changes in default value: ``` Error:MySqlPipelineITCase.testInitialStartupMode:228 expected: CreateTableEvent{tableId=inventory_kohdit.products, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL,`description` VARCHAR(512),`weight` FLOAT}, primaryKeys=id, options=()} but was: CreateTableEvent{tableId=inventory_kohdit.products, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`description` VARCHAR(512),`weight` FLOAT}, primaryKeys=id, options=()} ... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [cdc-common] add field of defaultValue to Column. [flink-cdc]
lvyanquan commented on code in PR #2944: URL: https://github.com/apache/flink-cdc/pull/2944#discussion_r1690719229 ## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TableChangeInfo.java: ## @@ -139,6 +144,10 @@ public TableChangeInfo deserialize(int version, byte[] serialized) throws IOExce try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized); DataInputStream in = new DataInputStream(bais)) { TableId tableId = tableIdSerializer.deserialize(new DataInputViewStreamWrapper(in)); +if (tableId.equals(magicTableId)) { +version = in.readInt(); +tableId = tableIdSerializer.deserialize(new DataInputViewStreamWrapper(in)); +} Review Comment: addressed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.20][Test] Fix tests about file-merging [flink]
Zakelly merged PR #25118: URL: https://github.com/apache/flink/pull/25118 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35858][Runtime/State] Namespace support for async state [flink]
Zakelly merged PR #25103: URL: https://github.com/apache/flink/pull/25103 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35858][Runtime/State] Namespace support for async state [flink]
Zakelly commented on PR #25103: URL: https://github.com/apache/flink/pull/25103#issuecomment-2249243839 Thanks for the review, merging... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35835][test] Make file-merging test tolerate the scenario that source does not produce any record [flink]
Zakelly merged PR #25116: URL: https://github.com/apache/flink/pull/25116 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [cdc-common] add field of defaultValue to Column. [flink-cdc]
yuxiqian commented on code in PR #2944: URL: https://github.com/apache/flink-cdc/pull/2944#discussion_r1690696796 ## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TableChangeInfo.java: ## @@ -139,6 +144,10 @@ public TableChangeInfo deserialize(int version, byte[] serialized) throws IOExce try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized); DataInputStream in = new DataInputStream(bais)) { TableId tableId = tableIdSerializer.deserialize(new DataInputViewStreamWrapper(in)); +if (tableId.equals(magicTableId)) { +version = in.readInt(); +tableId = tableIdSerializer.deserialize(new DataInputViewStreamWrapper(in)); +} Review Comment: If no `magicTableId` was present, `version` will not be overwritten and defaults to be latest version (`TableChangeInfo.SERIALIZER.getVersion()`). Is this intended? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [cdc-common] add field of defaultValue to Column. [flink-cdc]
lvyanquan commented on PR #2944: URL: https://github.com/apache/flink-cdc/pull/2944#issuecomment-2249216319 @yuxiqian I've add magic header in SERIALIZER of TableChangeInfo to distinguish with the state in 3.1.1 and after, can you help to check again? And this change could be verified after https://github.com/apache/flink-cdc/pull/3426 and this pr, so I tend to add it in current pr. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34877][cdc] Flink CDC pipeline transform supports type conversion [flink-cdc]
yuxiqian commented on code in PR #3357: URL: https://github.com/apache/flink-cdc/pull/3357#discussion_r1689894811 ## flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperatorTest.java: ## @@ -555,6 +574,64 @@ void testTimestampDiffTransform() throws Exception { .isEqualTo(new StreamRecord<>(insertEventExpect)); } +@Test +void testNullCastTransform() throws Exception { Review Comment: Great to see this! Could you please insert more non-null data row to test if non-null value casting and verify if casting results are correct? Maybe `FlinkPipelineComposerITCase` is a good place for this case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28023][docs]Translate "Native Kubernetes" page into Chinese [flink]
bobobozh closed pull request #25098: [FLINK-28023][docs]Translate "Native Kubernetes" page into Chinese URL: https://github.com/apache/flink/pull/25098 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix][test] Add CDC 3.1.1 release to migration test versions [flink-cdc]
yuxiqian commented on PR #3426: URL: https://github.com/apache/flink-cdc/pull/3426#issuecomment-2249185454 Could this be merged to ensure our next version is compatible with 3.1.1? @leonardBang -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35822] Introduce DESCRIBE FUNCTION [flink]
AlanConfluent commented on code in PR #25115: URL: https://github.com/apache/flink/pull/25115#discussion_r1690520725 ## flink-table/flink-sql-client/src/test/resources/sql/function.q: ## @@ -346,3 +346,69 @@ show user functions; SHOW JARS; Empty set !ok + +# == +# test describe function +# == + +ADD JAR '$VAR_UDF_JAR_PATH'; +[INFO] Execute statement succeeded. +!info + +describe function temp_upperudf; ++---+-$VAR_UDF_JAR_PATH_DASH+ +| info name | $VAR_UDF_JAR_PATH_SPACE info value | Review Comment: What are these variables? `VAR_UDF_JAR_PATH_SPACE`. Seems some of the docs have them, and others don't but makes it hard to read the source file. ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DescribeFunctionOperation.java: ## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.api.internal.TableResultInternal; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.ContextResolvedFunction; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.UnresolvedIdentifier; +import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.types.DataType; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.apache.flink.table.api.internal.TableResultUtils.buildTableResult; +import static org.apache.flink.table.types.inference.TypeInferenceUtil.generateSignature; + +/** + * Operation to describe a DESCRIBE [EXTENDED] [[catalogName.] dataBasesName].sqlIdentifier + * statement. + */ +@Internal +public class DescribeFunctionOperation implements Operation, ExecutableOperation { + +private final ObjectIdentifier sqlIdentifier; +private final boolean isExtended; + +public DescribeFunctionOperation(ObjectIdentifier sqlIdentifier, boolean isExtended) { +this.sqlIdentifier = sqlIdentifier; +this.isExtended = isExtended; +} + +public ObjectIdentifier getSqlIdentifier() { +return sqlIdentifier; +} + +public boolean isExtended() { +return isExtended; +} + +@Override +public String asSummaryString() { +Map params = new LinkedHashMap<>(); +params.put("identifier", sqlIdentifier); +params.put("isExtended", isExtended); +return OperationUtils.formatWithChildren( +"DESCRIBE FUNCTION", params, Collections.emptyList(), Operation::asSummaryString); +} + +@Override +public TableResultInternal execute(Context ctx) { +// DESCRIBE FUNCTION shows all the function properties. +Optional functionOpt = + ctx.getFunctionCatalog().lookupFunction(UnresolvedIdentifier.of(sqlIdentifier)); +if (!functionOpt.isPresent()) { +throw new ValidationException( +String.format( +"Function with the identifier '%s' doesn't exist.", +sqlIdentifier.asSummaryString())); +} +ContextResolvedFunction function = functionOpt.get(); +CatalogFunction catalogFunction = function.getCatalogFunction(); + +if (catalogFunction == null && !isExtended) { +throw new ValidationException( +String.format( +"Function with the identifier '%s' is a system function which can only be described by using the extended keyword. Use 'DESCRIBE FUNCTION EXTENDED %s' to see its properties.", +sqlIdentifier.asSummaryString(),
Re: [PR] [FLINK-35500][Connectors/DynamoDB] DynamoDb Table API Sink fails to delete elements due to key not found [flink-connector-aws]
boring-cyborg[bot] commented on PR #152: URL: https://github.com/apache/flink-connector-aws/pull/152#issuecomment-2248823319 Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-35500][Connectors/DynamoDB] DynamoDb Table API Sink fails to delete elements due to key not found [flink-connector-aws]
robg-eb opened a new pull request, #152: URL: https://github.com/apache/flink-connector-aws/pull/152 ## Purpose of the change When `DynamoDbSink` is used with CDC sources, it fails to process DELETE records and throws ``` org.apache.flink.connector.dynamodb.shaded.software.amazon.awssdk.services.dynamodb.model.DynamoDbException: The provided key element does not match the schema ``` This is due to `DynamoDbSinkWriter` passing the whole DynamoDb Item as key instead of the constructed primary key alone. ## Verifying this change This change added tests and can be verified as follows: - Added tests to `RowDataToAttributeValueConverterTest.java`: - `testDeleteOnlyPrimaryKey` - Ensures that for a `DELETE` request, only the (single) PK field is included - `testDeleteOnlyPrimaryKeys` - Ensures that for a `DELETE` request with a composite PK, both PK fields are included. - `testPKIgnoredForInsert` - Ensures that PK is ignored when an `INSERT` request is done, and all fields continue to be included as they have been in the past. - `testPKIgnoredForUpdateAfter` - Ensures that PK is ignored when an `UPDATE_AFTER` request is done, and all fields continue to be included as they have been in the past. - Ran manual tests following the steps noted in https://issues.apache.org/jira/browse/FLINK-35500 under "Steps To Reproduce". Running the SQL statement as described in Step 6 now properly runs a DELETE in DynamoDB. ## Significant changes Previously, the `PRIMARY KEY` field had no significance for a DynamoDB Sink via Table API. Now, the `PRIMARY KEY` is required when processing a CDC stream that contains `DELETES`.This is not a 'breaking change' because the previous behavior for processing a CDC stream containing `DELETES` was already a failure (`The provided key element does not match the schema`). This change now provides a clear exception informing users to specify a Primary Key to avoid that failure. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-30481][FLIP-277] GlueCatalog Implementation [flink-connector-aws]
foxus commented on code in PR #47: URL: https://github.com/apache/flink-connector-aws/pull/47#discussion_r1690278314 ## flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/GlueUtils.java: ## @@ -0,0 +1,384 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue.util; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogDatabaseImpl; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.FunctionLanguage; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.glue.GlueCatalogOptions; +import org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.glue.model.Column; +import software.amazon.awssdk.services.glue.model.Database; +import software.amazon.awssdk.services.glue.model.GlueResponse; +import software.amazon.awssdk.services.glue.model.Table; +import software.amazon.awssdk.services.glue.model.UserDefinedFunction; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly; + +/** Utilities related glue Operation. */ +@Internal +public class GlueUtils { + +private static final Logger LOG = LoggerFactory.getLogger(GlueUtils.class); + +/** + * Glue supports lowercase naming convention. + * + * @param name fully qualified name. + * @return modified name according to glue convention. + */ +public static String getGlueConventionalName(String name) { + +return name.toLowerCase(Locale.ROOT); +} + +/** + * Extract location from database properties if present and remove location from properties. + * fallback to create default location if not present + * + * @param databaseProperties database properties. + * @param databaseName fully qualified name for database. + * @return location for database. + */ +public static String extractDatabaseLocation( +final Map databaseProperties, +final String databaseName, +final String catalogPath) { +if (databaseProperties.containsKey(GlueCatalogConstants.LOCATION_URI)) { +return databaseProperties.remove(GlueCatalogConstants.LOCATION_URI); +} else { +LOG.info("No location URI Set. Using Catalog Path as default"); +return catalogPath + GlueCatalogConstants.LOCATION_SEPARATOR + databaseName; +} +} + +/** + * Extract location from database properties if present and remove location from properties. + * fallback to create default location if not present + * + * @param tableProperties table properties. + * @param tablePath fully qualified object for table. + * @return location for table. + */ +public static String extractTableLocation( +final Map tableProperties, +final ObjectPath tablePath, +final String catalogPath) { +if (tableProperties.containsKey(GlueCatalogConstants.LOCATION_URI)) { +return tableProperties.remove(GlueCatalogConstants.LOCATION_URI); +} else { +return catalogPath ++ GlueCatalogConstants.LOCATION_SEPARATOR ++ tablePath.getDatabaseName() ++ GlueCatalogConstants.LOCATION_SEPARATOR ++
Re: [PR] [FLINK-30481][FLIP-277] GlueCatalog Implementation [flink-connector-aws]
foxus commented on code in PR #47: URL: https://github.com/apache/flink-connector-aws/pull/47#discussion_r1690147540 ## flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/operator/GlueFunctionOperator.java: ## @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue.operator; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.CatalogFunctionImpl; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException; +import org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants; +import org.apache.flink.table.catalog.glue.util.GlueUtils; +import org.apache.flink.table.resource.ResourceUri; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.glue.GlueClient; +import software.amazon.awssdk.services.glue.model.AlreadyExistsException; +import software.amazon.awssdk.services.glue.model.CreateUserDefinedFunctionRequest; +import software.amazon.awssdk.services.glue.model.CreateUserDefinedFunctionResponse; +import software.amazon.awssdk.services.glue.model.DeleteUserDefinedFunctionRequest; +import software.amazon.awssdk.services.glue.model.DeleteUserDefinedFunctionResponse; +import software.amazon.awssdk.services.glue.model.EntityNotFoundException; +import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionRequest; +import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionResponse; +import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionsRequest; +import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionsResponse; +import software.amazon.awssdk.services.glue.model.GlueException; +import software.amazon.awssdk.services.glue.model.PrincipalType; +import software.amazon.awssdk.services.glue.model.UpdateUserDefinedFunctionRequest; +import software.amazon.awssdk.services.glue.model.UpdateUserDefinedFunctionResponse; +import software.amazon.awssdk.services.glue.model.UserDefinedFunction; +import software.amazon.awssdk.services.glue.model.UserDefinedFunctionInput; + +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +/** Utilities for Glue catalog Function related operations. */ +@Internal +public class GlueFunctionOperator extends GlueOperator { + +private static final Logger LOG = LoggerFactory.getLogger(GlueFunctionOperator.class); + +public GlueFunctionOperator(String catalogName, GlueClient glueClient, String glueCatalogId) { +super(catalogName, glueClient, glueCatalogId); +} + +/** + * Create a function. Function name should be handled in a case-insensitive way. + * + * @param functionPath path of the function + * @param function Flink function to be created + * @throws CatalogException in case of any runtime exception + */ +public void createGlueFunction(ObjectPath functionPath, CatalogFunction function) +throws CatalogException, FunctionAlreadyExistException { +UserDefinedFunctionInput functionInput = createFunctionInput(functionPath, function); +CreateUserDefinedFunctionRequest.Builder createUDFRequest = +CreateUserDefinedFunctionRequest.builder() +.databaseName(functionPath.getDatabaseName()) +.catalogId(getGlueCatalogId()) +.functionInput(functionInput); +try { +CreateUserDefinedFunctionResponse response = + glueClient.createUserDefinedFunction(createUDFRequest.build()); +GlueUtils.validateGlueResponse(response); +LOG.info("Created Function: {}", functionPath.getFullName()); +} catch (AlreadyExistsException e) { +LOG.error( +String.format( +"%s.%s already Exists. Function language of type:
Re: [PR] [FLINK-30481][FLIP-277] GlueCatalog Implementation [flink-connector-aws]
foxus commented on code in PR #47: URL: https://github.com/apache/flink-connector-aws/pull/47#discussion_r1690285665 ## flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/GlueUtils.java: ## @@ -0,0 +1,384 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue.util; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogDatabaseImpl; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.FunctionLanguage; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.glue.GlueCatalogOptions; +import org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.glue.model.Column; +import software.amazon.awssdk.services.glue.model.Database; +import software.amazon.awssdk.services.glue.model.GlueResponse; +import software.amazon.awssdk.services.glue.model.Table; +import software.amazon.awssdk.services.glue.model.UserDefinedFunction; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly; + +/** Utilities related glue Operation. */ +@Internal +public class GlueUtils { + +private static final Logger LOG = LoggerFactory.getLogger(GlueUtils.class); + +/** + * Glue supports lowercase naming convention. + * + * @param name fully qualified name. + * @return modified name according to glue convention. + */ +public static String getGlueConventionalName(String name) { + +return name.toLowerCase(Locale.ROOT); +} + +/** + * Extract location from database properties if present and remove location from properties. + * fallback to create default location if not present + * + * @param databaseProperties database properties. + * @param databaseName fully qualified name for database. + * @return location for database. + */ +public static String extractDatabaseLocation( +final Map databaseProperties, +final String databaseName, +final String catalogPath) { +if (databaseProperties.containsKey(GlueCatalogConstants.LOCATION_URI)) { +return databaseProperties.remove(GlueCatalogConstants.LOCATION_URI); +} else { +LOG.info("No location URI Set. Using Catalog Path as default"); +return catalogPath + GlueCatalogConstants.LOCATION_SEPARATOR + databaseName; +} +} + +/** + * Extract location from database properties if present and remove location from properties. + * fallback to create default location if not present + * + * @param tableProperties table properties. + * @param tablePath fully qualified object for table. + * @return location for table. + */ +public static String extractTableLocation( +final Map tableProperties, +final ObjectPath tablePath, +final String catalogPath) { +if (tableProperties.containsKey(GlueCatalogConstants.LOCATION_URI)) { +return tableProperties.remove(GlueCatalogConstants.LOCATION_URI); +} else { +return catalogPath ++ GlueCatalogConstants.LOCATION_SEPARATOR ++ tablePath.getDatabaseName() ++ GlueCatalogConstants.LOCATION_SEPARATOR ++
Re: [PR] [FLINK-22748] Allow dynamic target topic selection in SQL Kafka sinks [flink-connector-kafka]
dmariassy commented on code in PR #109: URL: https://github.com/apache/flink-connector-kafka/pull/109#discussion_r1690320572 ## flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java: ## @@ -144,14 +147,34 @@ public void open( valueSerialization.open(context); } +private String getTargetTopic(RowData element) { +final String topic = readMetadata(element, KafkaDynamicSink.WritableMetadata.TOPIC); +if (topic == null && topics == null) { +throw new IllegalArgumentException( +"The topic of the sink record is not valid. Expected a single topic but no topic is set."); +} else if (topic == null && topics.size() == 1) { +return topics.get(0); Review Comment: > then for topics.size == 1 the topic metadata column should not be writable I think that makes a lot of sense! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-22748] Allow dynamic target topic selection in SQL Kafka sinks [flink-connector-kafka]
klam-shop commented on code in PR #109: URL: https://github.com/apache/flink-connector-kafka/pull/109#discussion_r1690254530 ## flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java: ## @@ -144,14 +147,34 @@ public void open( valueSerialization.open(context); } +private String getTargetTopic(RowData element) { +final String topic = readMetadata(element, KafkaDynamicSink.WritableMetadata.TOPIC); +if (topic == null && topics == null) { +throw new IllegalArgumentException( +"The topic of the sink record is not valid. Expected a single topic but no topic is set."); +} else if (topic == null && topics.size() == 1) { +return topics.get(0); Review Comment: Good idea, I thought about it and thought it might make the semantics a bit complicated, because if we do this, then for `topics.size == 1` then I think the `topic` metadata column should not be writable, but I think it's worth saving the processing. Updated ## flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java: ## @@ -144,14 +147,34 @@ public void open( valueSerialization.open(context); } +private String getTargetTopic(RowData element) { +final String topic = readMetadata(element, KafkaDynamicSink.WritableMetadata.TOPIC); +if (topic == null && topics == null) { +throw new IllegalArgumentException( +"The topic of the sink record is not valid. Expected a single topic but no topic is set."); +} else if (topic == null && topics.size() == 1) { +return topics.get(0); Review Comment: Good idea, I thought about it before and thought it might make the semantics a bit complicated, because if we do this, then for `topics.size == 1` then I think the `topic` metadata column should not be writable, but I think it's worth saving the processing. Updated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-22748] Allow dynamic target topic selection in SQL Kafka sinks [flink-connector-kafka]
klam-shop commented on code in PR #109: URL: https://github.com/apache/flink-connector-kafka/pull/109#discussion_r1690197712 ## docs/content/docs/connectors/table/kafka.md: ## @@ -196,11 +196,11 @@ Connector Options topic - required for sink + optional yes (none) String - Topic name(s) to read data from when the table is used as source. It also supports topic list for source by separating topic by semicolon like 'topic-1;topic-2'. Note, only one of "topic-pattern" and "topic" can be specified for sources. When the table is used as sink, the topic name is the topic to write data to. Note topic list is not supported for sinks. + Topic name(s) to read data from when the table is used as source, or allowed topics for writing when the table is used as sink. It also supports topic list for source by separating topic by semicolon like 'topic-1;topic-2'. Note, only one of "topic-pattern" and "topic" can be specified for sources. For sinks, the topic or list of topics provided are they only valid values (whitelist) that can be written to the `topic` metadata column for determining what topic to produce to. If only a single topic is provided, this is the default topic to produce to. If not provided, any value of `topic` metadata is permitted. Review Comment: Good call, fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add support to pass Datadog API Key as environment variable [flink]
swetakala commented on PR #19684: URL: https://github.com/apache/flink/pull/19684#issuecomment-2248547135 Even I need this change, I have opened ticket to address this https://issues.apache.org/jira/browse/FLINK-33994. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.20][Test] Fix tests about file-merging [flink]
flinkbot commented on PR #25118: URL: https://github.com/apache/flink/pull/25118#issuecomment-2248189331 ## CI report: * 78c25a829275db5bf0295ce5fefab22c89afb7b0 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [BP-1.20][Test] Fix tests about file-merging [flink]
Zakelly opened a new pull request, #25118: URL: https://github.com/apache/flink/pull/25118 ## What is the purpose of the change BP #25090 and #25116 ## Brief change log - `SnapshotFileMergingCompatibilityITCase` ## Verifying this change Run `SnapshotFileMergingCompatibilityITCase` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]
ferenc-csaky commented on code in PR #821: URL: https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1689490037 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java: ## @@ -72,21 +74,26 @@ public class FlinkConfigManager { private volatile Configuration defaultConfig; private volatile FlinkOperatorConfiguration defaultOperatorConfiguration; +private final boolean snapshotCrdInstalled; Review Comment: As we discussed offline, the whole snapshot CRD install check is planned for 1 release to give more meaningful errors for users the first time they upgrade to an operator version that includes this feature, but will be useless after that. So I think it would make sense to mark this with a TODO to remove it in the 1.11 release. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Beifang flink 1.17 [flink]
beifang1999 closed pull request #25117: Beifang flink 1.17 URL: https://github.com/apache/flink/pull/25117 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Beifang flink 1.17 [flink]
beifang1999 opened a new pull request, #25117: URL: https://github.com/apache/flink/pull/25117 ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change Please make sure both new and modified tests in this PR follow [the conventions for tests defined in our code quality guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing). *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]
ferenc-csaky commented on code in PR #821: URL: https://github.com/apache/flink-kubernetes-operator/pull/821#discussion_r1689526438 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java: ## @@ -83,20 +82,20 @@ void deleteClusterDeployment( Configuration conf, boolean deleteHaData); -void cancelSessionJob(FlinkSessionJob sessionJob, UpgradeMode upgradeMode, Configuration conf) +Optional cancelSessionJob( +FlinkSessionJob sessionJob, UpgradeMode upgradeMode, Configuration conf) throws Exception; -void triggerSavepoint( +String triggerSavepoint( String jobId, -SnapshotTriggerType triggerType, -SavepointInfo savepointInfo, +org.apache.flink.core.execution.SavepointFormatType savepointFormatType, +String savepointDirectory, Configuration conf) throws Exception; -void triggerCheckpoint( +String triggerCheckpoint( String jobId, -SnapshotTriggerType triggerType, -CheckpointInfo checkpointInfo, +org.apache.flink.core.execution.CheckpointType checkpointFormatType, Review Comment: nit: checkpointFormatType -> checkpointType ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotContext.java: ## @@ -44,61 +45,56 @@ public class FlinkStateSnapshotContext { private final Context josdkContext; private final FlinkConfigManager configManager; -private FlinkOperatorConfiguration operatorConfig; -private Configuration referencedJobObserveConfig; -private FlinkDeployment referencedJobFlinkDeployment; +@Getter(lazy = true) +private final FlinkOperatorConfiguration operatorConfig = operatorConfig(); + +@Getter(lazy = true) +private final Configuration referencedJobObserveConfig = referencedJobObserveConfig(); + +@Getter(lazy = true) +private final FlinkDeployment referencedJobFlinkDeployment = referencedJobFlinkDeployment(); /** * @return Operator configuration for this resource. */ -public FlinkOperatorConfiguration getOperatorConfig() { -if (operatorConfig != null) { -return operatorConfig; -} -return operatorConfig = -configManager.getOperatorConfiguration( -getResource().getMetadata().getNamespace(), null); +public FlinkOperatorConfiguration operatorConfig() { +return configManager.getOperatorConfiguration( +getResource().getMetadata().getNamespace(), null); +} + +public Configuration referencedJobObserveConfig() { +return configManager.getObserveConfig(getReferencedJobFlinkDeployment()); +} + +public FlinkDeployment referencedJobFlinkDeployment() { Review Comment: These lazy init methods can be private. ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java: ## @@ -72,21 +74,26 @@ public class FlinkConfigManager { private volatile Configuration defaultConfig; private volatile FlinkOperatorConfiguration defaultOperatorConfiguration; +private final boolean snapshotCrdInstalled; Review Comment: As we discussed offline, the whole snapshot CRD install check is planned for 1 release to give more meaningful errors for users, so I think it would make sense to mark this with a TODO to remove it in the 1.11 release. ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtils.java: ## @@ -0,0 +1,395 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.utils; + +import org.apache.flink.autoscaler.utils.DateTimeUtils; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; +import org.apache.flink.kubernetes.operator.api.CrdConstants; +import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot; +import
Re: [PR] [cdc-common] add field of defaultValue to Column. [flink-cdc]
lvyanquan commented on PR #2944: URL: https://github.com/apache/flink-cdc/pull/2944#issuecomment-2247879650 Fixed the test case. And the wrong use of SERIALIZER in [TransformSchemaOperator](https://github.com/apache/flink-cdc/blob/ea71b2302ddc5f9b7be65843dbf3f5bed4ca9d8e/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformSchemaOperator.java#L127) is actually another problem that could be traced in another jira. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35864][Table SQL / API] Add CONV function [flink]
lsyldliu commented on code in PR #25109: URL: https://github.com/apache/flink/pull/25109#discussion_r1689382425 ## docs/data/sql_functions.yml: ## @@ -238,6 +238,16 @@ arithmetic: NUMERIC.hex() STRING.hex() description: Returns a string representation of an integer NUMERIC value or a STRING in hex format. Returns NULL if the argument is NULL. E.g. a numeric 20 leads to "14", a numeric 100 leads to "64", a string "hello,world" leads to "68656C6C6F2C776F726C64". + - sql: CONV(num, fromBase, toBase) +table: num.conv(fromBase, toBase) +description: | + Converts num from fromBase to toBase. + The function supports base 2 to base 36, fromBase in [2, 36], ABS(toBase) in [2,36], otherwise the function returns null. + If toBase is negative, num is interpreted as a signed number, otherwise it is treated as an unsigned number. The result is consistent with this rule. + If num overflows, the function raises an error. + num [ | ], fromBase [ | | ], toBase [ | | ] Review Comment: About `num`: I think there should tell users the exact type instead of the logic type family. Users don't know it. To `fromBase`, `toBase`: according to the jira issue description, if Integer represents all integer numeric type, BIGINT should also be supported. ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java: ## @@ -1817,6 +1817,29 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL) .outputTypeStrategy(nullableIfArgs(explicit(DataTypes.STRING( .build(); +public static final BuiltInFunctionDefinition CONV = +BuiltInFunctionDefinition.newBuilder() +.name("CONV") +.kind(SCALAR) +.inputTypeStrategy( +sequence( +Arrays.asList("num", "fromBase", "toBase"), +Arrays.asList( +or( + logical(LogicalTypeFamily.INTEGER_NUMERIC), + logical(LogicalTypeFamily.CHARACTER_STRING)), +or( + logical(LogicalTypeRoot.TINYINT), Review Comment: I think we should also support `bigint` type, I see Spark also supports this type. so the code can be optimized as follows ? ``` or(logical(LogicalTypeFamily.INTEGER_NUMERIC)), or(logical(LogicalTypeFamily.INTEGER_NUMERIC) ``` ## docs/data/sql_functions.yml: ## @@ -238,6 +238,16 @@ arithmetic: NUMERIC.hex() STRING.hex() description: Returns a string representation of an integer NUMERIC value or a STRING in hex format. Returns NULL if the argument is NULL. E.g. a numeric 20 leads to "14", a numeric 100 leads to "64", a string "hello,world" leads to "68656C6C6F2C776F726C64". + - sql: CONV(num, fromBase, toBase) +table: num.conv(fromBase, toBase) +description: | + Converts num from fromBase to toBase. Review Comment: ```suggestion Converts num from fromBase to toBase. ``` ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ConvFunction.java: ## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.binary.BinaryStringData; +import org.apache.flink.table.data.binary.BinaryStringDataUtil; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext; +import org.apache.flink.table.runtime.functions.BaseConversionUtils; + +import javax.annotation.Nullable; + +import static java.nio.charset.StandardCharsets.UTF_8;
Re: [PR] [FLINK-35740][mysql] Allow column as chunk key even if not in Primary Keys [flink-cdc]
leonardBang merged PR #3448: URL: https://github.com/apache/flink-cdc/pull/3448 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [cdc-common] add field of defaultValue to Column. [flink-cdc]
yuxiqian commented on PR #2944: URL: https://github.com/apache/flink-cdc/pull/2944#issuecomment-2247573307 > Done rebase. > > Add the cause of failed case is that version info of SERIALIZER were not included in [TransformSchemaOperator](https://github.com/apache/flink-cdc/blob/ea71b2302ddc5f9b7be65843dbf3f5bed4ca9d8e/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformSchemaOperator.java#L127), which will cause incompatible state, I would like to start a discussion with @aiwenmo @yuxiqian to solve it. Can we add some magic header bytes in newer serialization formats (with serializer version of course) to distinguish it with legacy ones? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [cdc-common] add field of defaultValue to Column. [flink-cdc]
yuxiqian commented on PR #2944: URL: https://github.com/apache/flink-cdc/pull/2944#issuecomment-2247569644 > Done rebase. > > Add the cause of failed case is that version info of SERIALIZER were not included in [TransformSchemaOperator](https://github.com/apache/flink-cdc/blob/ea71b2302ddc5f9b7be65843dbf3f5bed4ca9d8e/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformSchemaOperator.java#L127), which will cause incompatible state, I would like to start a discussion with @aiwenmo @yuxiqian to solve it. My 20 cents: Since it's a legacy problem that we didn't serialize version number, can we add magic header bytes to distinguish newer version (with serializer version) and legacy ones? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35835][test] Make file-merging test tolerate the scenario that source does not produce any record [flink]
Zakelly commented on code in PR #25116: URL: https://github.com/apache/flink/pull/25116#discussion_r1689517664 ## flink-tests/src/test/java/org/apache/flink/test/checkpointing/SnapshotFileMergingCompatibilityITCase.java: ## @@ -344,13 +343,14 @@ private static void verifyCheckpointExistOrWaitDeleted( } /** - * Traverse the checkpoint metadata and verify all the state handle is disposed. + * Traverse the checkpoint metadata and verify all the state handle exist or be disposed. * * @param metadata the metadata to traverse. + * @param exist all corresponding files of the state handle should exist * @return true if all corresponding files are deleted. */ -private static boolean verifyCheckpointDisposed(CheckpointMetadata metadata) { -AtomicBoolean disposed = new AtomicBoolean(true); +private boolean verifyCheckpointExist(CheckpointMetadata metadata, boolean exist) { Review Comment: Cause the logger is referenced [here](https://github.com/apache/flink/pull/25116/files/a2b0a3795011a8fae37480af5952e7bfface6f4a#diff-ecfc97da093a973dee815155ff11b431a45cca9853f77aab00916ad68757510dR381-R382), which is a non-static member. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35858][Runtime/State] Namespace support for async state [flink]
Zakelly commented on code in PR #25103: URL: https://github.com/apache/flink/pull/25103#discussion_r1689514188 ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/RecordContext.java: ## @@ -56,6 +59,9 @@ public class RecordContext extends ReferenceCounted
Re: [PR] [FLINK-35740][mysql] Allow column as chunk key even if not in Primary Keys [flink-cdc]
leonardBang commented on code in PR #3448: URL: https://github.com/apache/flink-cdc/pull/3448#discussion_r1689514710 ## docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md: ## @@ -558,7 +558,10 @@ Flink 定期为 Source 执行 checkpoint,在故障转移的情况下,作业 在执行增量快照读取时,MySQL CDC source 需要一个用于分片的的算法。 MySQL CDC Source 使用主键列将表划分为多个分片(chunk)。 默认情况下,MySQL CDC source 会识别表的主键列,并使用主键中的第一列作为用作分片列。 -如果表中没有主键, 增量快照读取将失败,你可以禁用 `scan.incremental.snapshot.enabled` 来回退到旧的快照读取机制。 +如果表中没有主键,用户必须指定 `scan.incremental.snapshot.chunk.key-column`、 +否则增量快照读取将失败,你可以禁用 `scan.incremental.snapshot.enabled` 恢复到旧的快照读取机制。 +请注意,使用不在主键中的列作为分块键可能会降低表的查询性能。 + Review Comment: Sorry when I realized that you may not speak Chinese and it's not necessary part. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35740][mysql] Allow column as chunk key even if not in Primary Keys [flink-cdc]
SML0127 commented on PR #3448: URL: https://github.com/apache/flink-cdc/pull/3448#issuecomment-2247463147 @leonardBang Thank you for comments. I modified documents with your suggestions! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35740][mysql] Allow column as chunk key even if not in Primary Keys [flink-cdc]
SML0127 commented on code in PR #3448: URL: https://github.com/apache/flink-cdc/pull/3448#discussion_r1689496974 ## docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md: ## @@ -493,7 +493,7 @@ CREATE TABLE products ( * (3)在快照读取之前,Source 不需要数据库锁权限。 如果希望 source 并行运行,则每个并行 reader 都应该具有唯一的 server id,因此`server id`的范围必须类似于 `5400-6400`, -且范围必须大于并行度。在增量快照读取过程中,MySQL CDC Source 首先通过表的主键将表划分成多个块(chunk), +且范围必须大于并行度。在增量快照读取过程中,MySQL CDC Source 源首先会根据您指定的表块键将表分块(chunk), Review Comment: modified. ## docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md: ## @@ -543,7 +543,7 @@ MySQL 集群中你监控的服务器出现故障后, 你只需将受监视的服 当 MySQL CDC Source 启动时,它并行读取表的快照,然后以单并行度的方式读取表的 binlog。 -在快照阶段,根据表的主键和表行的大小将快照切割成多个快照块。 +在快照阶段,快照会根据表的分块键和表行的大小切割成多个快照块。 Review Comment: modified. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35835][test] Make file-merging test tolerate the scenario that source does not produce any record [flink]
fredia commented on code in PR #25116: URL: https://github.com/apache/flink/pull/25116#discussion_r1689498525 ## flink-tests/src/test/java/org/apache/flink/test/checkpointing/SnapshotFileMergingCompatibilityITCase.java: ## @@ -344,13 +343,14 @@ private static void verifyCheckpointExistOrWaitDeleted( } /** - * Traverse the checkpoint metadata and verify all the state handle is disposed. + * Traverse the checkpoint metadata and verify all the state handle exist or be disposed. * * @param metadata the metadata to traverse. + * @param exist all corresponding files of the state handle should exist * @return true if all corresponding files are deleted. */ -private static boolean verifyCheckpointDisposed(CheckpointMetadata metadata) { -AtomicBoolean disposed = new AtomicBoolean(true); +private boolean verifyCheckpointExist(CheckpointMetadata metadata, boolean exist) { Review Comment: Why remove the `static` mark? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35888][cdc-connector][paimon] Add e2e test for PaimonDataSink. [flink-cdc]
yuxiqian commented on code in PR #3491: URL: https://github.com/apache/flink-cdc/pull/3491#discussion_r1689497656 ## flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml: ## @@ -99,6 +107,19 @@ limitations under the License. test-jar test + +org.apache.flink +flink-connector-files +${flink.version} +test + Review Comment: I see, `paimon-flink-common` declares it as a provided dependency. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35857] fix redeploy failed deployment without latest checkpoint [flink-kubernetes-operator]
gyfora merged PR #855: URL: https://github.com/apache/flink-kubernetes-operator/pull/855 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35857] fix redeploy failed deployment without latest checkpoint [flink-kubernetes-operator]
gyfora commented on PR #855: URL: https://github.com/apache/flink-kubernetes-operator/pull/855#issuecomment-2247447589 Thank you for the fix @chenyuzhi459 this was a very nice catch :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35740][mysql] Allow column as chunk key even if not in Primary Keys [flink-cdc]
leonardBang commented on code in PR #3448: URL: https://github.com/apache/flink-cdc/pull/3448#discussion_r1689479277 ## docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md: ## @@ -543,7 +543,7 @@ MySQL 集群中你监控的服务器出现故障后, 你只需将受监视的服 当 MySQL CDC Source 启动时,它并行读取表的快照,然后以单并行度的方式读取表的 binlog。 -在快照阶段,根据表的主键和表行的大小将快照切割成多个快照块。 +在快照阶段,快照会根据表的分块键和表行的大小切割成多个快照块。 Review Comment: 在快照阶段,快照会根据表的chunk key 和表中数据行数切割成多个块(chunk)之后分布式读取。 ## docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md: ## @@ -493,7 +493,7 @@ CREATE TABLE products ( * (3)在快照读取之前,Source 不需要数据库锁权限。 如果希望 source 并行运行,则每个并行 reader 都应该具有唯一的 server id,因此`server id`的范围必须类似于 `5400-6400`, -且范围必须大于并行度。在增量快照读取过程中,MySQL CDC Source 首先通过表的主键将表划分成多个块(chunk), +且范围必须大于并行度。在增量快照读取过程中,MySQL CDC Source 源首先会根据您指定的表块键将表分块(chunk), Review Comment: 且范围必须大于并行度。在增量快照读取过程中,MySQL CDC Source 源首先会根据您指定的chunk key将表分块(chunk) ## docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md: ## @@ -558,7 +558,10 @@ Flink 定期为 Source 执行 checkpoint,在故障转移的情况下,作业 在执行增量快照读取时,MySQL CDC source 需要一个用于分片的的算法。 MySQL CDC Source 使用主键列将表划分为多个分片(chunk)。 默认情况下,MySQL CDC source 会识别表的主键列,并使用主键中的第一列作为用作分片列。 -如果表中没有主键, 增量快照读取将失败,你可以禁用 `scan.incremental.snapshot.enabled` 来回退到旧的快照读取机制。 +如果表中没有主键,用户必须指定 `scan.incremental.snapshot.chunk.key-column`、 +否则增量快照读取将失败,你可以禁用 `scan.incremental.snapshot.enabled` 恢复到旧的快照读取机制。 +请注意,使用不在主键中的列作为分块键可能会降低表的查询性能。 Review Comment: 如果表中没有主键,用户需要指定 `scan.incremental.snapshot.chunk.key-column`,否则增量快照读取将会失败,你可以禁用 `scan.incremental.snapshot.enabled` 恢复到旧的快照读取机制。 请注意:使用不在主键中的列作为chunk key可能会到查询表中数据的性能下降。 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35888][cdc-connector][paimon] Add e2e test for PaimonDataSink. [flink-cdc]
lvyanquan commented on code in PR #3491: URL: https://github.com/apache/flink-cdc/pull/3491#discussion_r1689460319 ## flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml: ## @@ -99,6 +107,19 @@ limitations under the License. test-jar test + +org.apache.flink +flink-connector-files +${flink.version} +test + Review Comment: Yeah, if removed, will got the following exception: ![image](https://github.com/user-attachments/assets/b774d082-b903-440e-a5a0-74c11084fd5e) And flink connector files is also provided in FLINK_HOME/lib directory. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35888][cdc-connector][paimon] Add e2e test for PaimonDataSink. [flink-cdc]
lvyanquan commented on code in PR #3491: URL: https://github.com/apache/flink-cdc/pull/3491#discussion_r1689470540 ## flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToPaimonE2eITCase.java: ## @@ -0,0 +1,371 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.pipeline.tests; + +import org.apache.flink.cdc.common.test.utils.TestUtils; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion; +import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase; +import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; + +import java.io.IOException; +import java.nio.file.Path; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Stream; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** End-to-end tests for mysql cdc to Paimon pipeline job. */ +@RunWith(Parameterized.class) +public class MySqlToPaimonE2eITCase extends PipelineTestEnvironment { +private static final Logger LOG = LoggerFactory.getLogger(MySqlToPaimonE2eITCase.class); + +public static final int TESTCASE_TIMEOUT_SECONDS = 60; + +private TableEnvironment tEnv; + +// -- +// MySQL Variables (we always use MySQL as the data source for easier verifying) +// -- +protected static final String MYSQL_TEST_USER = "mysqluser"; +protected static final String MYSQL_TEST_PASSWORD = "mysqlpw"; + +@ClassRule +public static final MySqlContainer MYSQL = +(MySqlContainer) +new MySqlContainer( +MySqlVersion.V8_0) // v8 support both ARM and AMD architectures +.withConfigurationOverride("docker/mysql/my.cnf") +.withSetupSQL("docker/mysql/setup.sql") +.withDatabaseName("flink-test") +.withUsername("flinkuser") +.withPassword("flinkpw") +.withNetwork(NETWORK) +.withNetworkAliases("mysql") +.withLogConsumer(new Slf4jLogConsumer(LOG)); + +protected final UniqueDatabase mysqlInventoryDatabase = +new UniqueDatabase(MYSQL, "mysql_inventory", MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); + +public MySqlToPaimonE2eITCase() throws IOException {} + +@BeforeClass +public static void initializeContainers() { +LOG.info("Starting containers..."); +Startables.deepStart(Stream.of(MYSQL)).join(); +LOG.info("Containers are started."); +} + +@Before +public void before() throws Exception { +super.before(); +mysqlInventoryDatabase.createAndInitialize(); +tEnv = TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build()); +} + +@After +public void after() { +super.after(); +mysqlInventoryDatabase.dropDatabase(); +} + +@Test +public void testSyncWholeDatabase() throws Exception { +
Re: [PR] [FLINK-35740][mysql] Allow column as chunk key even if not in Primary Keys [flink-cdc]
SML0127 commented on code in PR #3448: URL: https://github.com/apache/flink-cdc/pull/3448#discussion_r1689464653 ## docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md: ## @@ -558,7 +558,10 @@ Flink 定期为 Source 执行 checkpoint,在故障转移的情况下,作业 在执行增量快照读取时,MySQL CDC source 需要一个用于分片的的算法。 MySQL CDC Source 使用主键列将表划分为多个分片(chunk)。 默认情况下,MySQL CDC source 会识别表的主键列,并使用主键中的第一列作为用作分片列。 -如果表中没有主键, 增量快照读取将失败,你可以禁用 `scan.incremental.snapshot.enabled` 来回退到旧的快照读取机制。 +如果表中没有主键,用户必须指定 `scan.incremental.snapshot.chunk.key-column`、 +否则增量快照读取将失败,你可以禁用 `scan.incremental.snapshot.enabled` 恢复到旧的快照读取机制。 +请注意,使用不在主键中的列作为分块键可能会降低表的查询性能。 + Review Comment: @leonardBang I modified documents using a translator. Please check it out and feel free to correct any awkward points. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35888][cdc-connector][paimon] Add e2e test for PaimonDataSink. [flink-cdc]
lvyanquan commented on code in PR #3491: URL: https://github.com/apache/flink-cdc/pull/3491#discussion_r1689460319 ## flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml: ## @@ -99,6 +107,19 @@ limitations under the License. test-jar test + +org.apache.flink +flink-connector-files +${flink.version} +test + Review Comment: Yeah, if removed, will got the following exception: ![image](https://github.com/user-attachments/assets/b774d082-b903-440e-a5a0-74c11084fd5e) And flink-connector-files is also provided in FLINK_HOME/lib directory. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33874][runtime] Support resource request wait mechanism at DefaultDeclarativeSlotPool side for Default Scheduler [flink]
RocMarshal commented on code in PR #25113: URL: https://github.com/apache/flink/pull/25113#discussion_r1689426208 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AbstractSlotPoolServiceFactory.java: ## @@ -34,14 +36,18 @@ public abstract class AbstractSlotPoolServiceFactory implements SlotPoolServiceF @Nonnull protected final Time batchSlotTimeout; +@Nonnull protected final Duration slotRequestMaxInterval; + protected AbstractSlotPoolServiceFactory( @Nonnull Clock clock, @Nonnull Time rpcTimeout, @Nonnull Time slotIdleTimeout, -@Nonnull Time batchSlotTimeout) { +@Nonnull Time batchSlotTimeout, Review Comment: updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33874][runtime] Support resource request wait mechanism at DefaultDeclarativeSlotPool side for Default Scheduler [flink]
RocMarshal commented on code in PR #25113: URL: https://github.com/apache/flink/pull/25113#discussion_r1689424572 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java: ## @@ -1115,6 +1116,11 @@ public CompletableFuture updateJobResourceRequirements( return CompletableFuture.completedFuture(Acknowledge.get()); } +@VisibleForTesting +public SlotPoolService getSlotPoolService() { Review Comment: Thanks for the commet. It's a pure redundant test case. ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java: ## @@ -1115,6 +1116,11 @@ public CompletableFuture updateJobResourceRequirements( return CompletableFuture.completedFuture(Acknowledge.get()); } +@VisibleForTesting +public SlotPoolService getSlotPoolService() { Review Comment: Thanks for the comment. It's a pure redundant test case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35265] Implement FlinkStateSnapshot custom resource [flink-kubernetes-operator]
mateczagany commented on PR #821: URL: https://github.com/apache/flink-kubernetes-operator/pull/821#issuecomment-2247282685 Thanks for your feedback @anupamaggarwal, really appreciate it! 1. I agree, and this is what I expected to happen. Except it does not, because I forgot to add the FlinkStateSnapshot resource to the `ValidatingWebhookConfiguration` resource in the Helm chart, I will push a commit soon. 2. This PR is already quite huge, so I have decided to do the examples/docs in a separate PR, there is currently a draft for it: https://github.com/apache/flink-kubernetes-operator/pull/854 3. You are right, adding the config there would improve its visibility to the end-users, so I will do that in a future commit, thank you! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35740][mysql] Allow column as chunk key even if not in Primary Keys [flink-cdc]
SML0127 commented on PR #3448: URL: https://github.com/apache/flink-cdc/pull/3448#issuecomment-2247249844 @leonardBang @ruanhang1993 PTAL~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35858][Runtime/State] Namespace support for async state [flink]
fredia commented on code in PR #25103: URL: https://github.com/apache/flink/pull/25103#discussion_r1689372550 ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/RecordContext.java: ## @@ -56,6 +59,9 @@ public class RecordContext extends ReferenceCounted
Re: [PR] [FLINK-35318][table-planner] use UTC timezone to handle TIMESTAMP_WITHOUT_TIME_ZONE type in RexNodeToExpressionConverter [flink]
leonardBang merged PR #25094: URL: https://github.com/apache/flink/pull/25094 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35318][table-planner] use UTC timezone to handle TIMESTAMP_WITHOUT_TIME_ZONE type in RexNodeToExpressionConverter [flink]
leonardBang merged PR #25093: URL: https://github.com/apache/flink/pull/25093 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35363] FLIP-449: Reorganization of flink-connector-jdbc [flink-connector-jdbc]
1996fanrui commented on PR #123: URL: https://github.com/apache/flink-connector-jdbc/pull/123#issuecomment-2247216861 Thanks @eskabetxe for the contribution, and thanks @RocMarshal for the review! Due to both of you are active contributor of [flink-connector-jdbc](https://github.com/apache/flink-connector-jdbc), and you aren't committer. So I will merge this PR if no objection before next Monday(in 5 days). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org