(flink) branch release-1.18 updated: [FLINK-35217] Add missing fsync to #closeForCommit in some subclasses of RecoverableFsDataOutputStream. (#24722) (#24752)
This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.18 by this push: new e6726d3b962 [FLINK-35217] Add missing fsync to #closeForCommit in some subclasses of RecoverableFsDataOutputStream. (#24722) (#24752) e6726d3b962 is described below commit e6726d3b962383d9a2576fe117d7566b205f514a Author: Stefan Richter AuthorDate: Tue Apr 30 22:25:38 2024 +0200 [FLINK-35217] Add missing fsync to #closeForCommit in some subclasses of RecoverableFsDataOutputStream. (#24722) (#24752) (cherry picked from commit 80af4d502318348ba15a8f75a2a622ce9dbdc968) --- ...erFromPersistRecoverableFsDataOutputStream.java | 59 +++ .../local/LocalRecoverableFsDataOutputStream.java | 23 ++- .../AbstractRecoverableFsDataOutputStreamTest.java | 98 +++ .../LocalRecoverableFsDataOutputStreamTest.java| 188 + .../AzureBlobFsRecoverableDataOutputStream.java| 17 +- ...AzureBlobFsRecoverableDataOutputStreamTest.java | 100 +++ .../BaseHadoopFsRecoverableFsDataOutputStream.java | 12 +- .../hdfs/HadoopRecoverableFsDataOutputStream.java | 17 +- .../HadoopRecoverableFsDataOutputStreamTest.java | 89 ++ 9 files changed, 579 insertions(+), 24 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/CommitterFromPersistRecoverableFsDataOutputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/CommitterFromPersistRecoverableFsDataOutputStream.java new file mode 100644 index 000..12e3fbc7f4d --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/fs/CommitterFromPersistRecoverableFsDataOutputStream.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import java.io.IOException; + +/** + * {@link RecoverableFsDataOutputStream} with fixed implementation of {@link #closeForCommit()} that + * is based on using {@link #persist()} to ensure durability and creates the {@link + * org.apache.flink.core.fs.RecoverableFsDataOutputStream.Committer} from the corresponding {@link + * org.apache.flink.core.fs.RecoverableWriter.ResumeRecoverable}. + * + * @param return type of #persist() + */ +public abstract class CommitterFromPersistRecoverableFsDataOutputStream< +RESUME_RECOVERABLE extends RecoverableWriter.ResumeRecoverable> +extends RecoverableFsDataOutputStream { + +/** @see RecoverableFsDataOutputStream#persist() */ +@Override +public abstract RESUME_RECOVERABLE persist() throws IOException; + +/** + * @see RecoverableFsDataOutputStream#closeForCommit() + * @param recoverable a resume recoverable to create the committer from. Typically the parameter + * is the return value of {@link #persist()}. + * @return the committer created from recoverable. + */ +protected abstract Committer createCommitterFromResumeRecoverable( +RESUME_RECOVERABLE recoverable); + +/** + * @see RecoverableFsDataOutputStream#closeForCommit() + * @implNote Calls persist to ensure durability of the written data and creates a committer + * object from the return value of {@link #persist()}. + */ +@Override +public final Committer closeForCommit() throws IOException { +Committer committer = createCommitterFromResumeRecoverable(persist()); +close(); +return committer; +} +} diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java index cc9c88fc4f4..c273c31960e 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java @@ -19,9 +19,10 @@ package org.apache.flink.core.fs.local; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.cor
(flink) branch release-1.19 updated: [FLINK-35217] Add missing fsync to #closeForCommit in some subclasses of RecoverableFsDataOutputStream. (#24722) (#24751)
This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch release-1.19 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.19 by this push: new ac4aa35c6e2 [FLINK-35217] Add missing fsync to #closeForCommit in some subclasses of RecoverableFsDataOutputStream. (#24722) (#24751) ac4aa35c6e2 is described below commit ac4aa35c6e2e2da87760ffbf45d85888b1976c2f Author: Stefan Richter AuthorDate: Tue Apr 30 22:25:27 2024 +0200 [FLINK-35217] Add missing fsync to #closeForCommit in some subclasses of RecoverableFsDataOutputStream. (#24722) (#24751) (cherry picked from commit 80af4d502318348ba15a8f75a2a622ce9dbdc968) --- ...erFromPersistRecoverableFsDataOutputStream.java | 59 +++ .../local/LocalRecoverableFsDataOutputStream.java | 23 ++- .../AbstractRecoverableFsDataOutputStreamTest.java | 98 +++ .../LocalRecoverableFsDataOutputStreamTest.java| 188 + .../AzureBlobFsRecoverableDataOutputStream.java| 17 +- ...AzureBlobFsRecoverableDataOutputStreamTest.java | 100 +++ .../BaseHadoopFsRecoverableFsDataOutputStream.java | 12 +- .../hdfs/HadoopRecoverableFsDataOutputStream.java | 17 +- .../HadoopRecoverableFsDataOutputStreamTest.java | 89 ++ 9 files changed, 579 insertions(+), 24 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/CommitterFromPersistRecoverableFsDataOutputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/CommitterFromPersistRecoverableFsDataOutputStream.java new file mode 100644 index 000..12e3fbc7f4d --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/fs/CommitterFromPersistRecoverableFsDataOutputStream.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import java.io.IOException; + +/** + * {@link RecoverableFsDataOutputStream} with fixed implementation of {@link #closeForCommit()} that + * is based on using {@link #persist()} to ensure durability and creates the {@link + * org.apache.flink.core.fs.RecoverableFsDataOutputStream.Committer} from the corresponding {@link + * org.apache.flink.core.fs.RecoverableWriter.ResumeRecoverable}. + * + * @param return type of #persist() + */ +public abstract class CommitterFromPersistRecoverableFsDataOutputStream< +RESUME_RECOVERABLE extends RecoverableWriter.ResumeRecoverable> +extends RecoverableFsDataOutputStream { + +/** @see RecoverableFsDataOutputStream#persist() */ +@Override +public abstract RESUME_RECOVERABLE persist() throws IOException; + +/** + * @see RecoverableFsDataOutputStream#closeForCommit() + * @param recoverable a resume recoverable to create the committer from. Typically the parameter + * is the return value of {@link #persist()}. + * @return the committer created from recoverable. + */ +protected abstract Committer createCommitterFromResumeRecoverable( +RESUME_RECOVERABLE recoverable); + +/** + * @see RecoverableFsDataOutputStream#closeForCommit() + * @implNote Calls persist to ensure durability of the written data and creates a committer + * object from the return value of {@link #persist()}. + */ +@Override +public final Committer closeForCommit() throws IOException { +Committer committer = createCommitterFromResumeRecoverable(persist()); +close(); +return committer; +} +} diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java index cc9c88fc4f4..c273c31960e 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java @@ -19,9 +19,10 @@ package org.apache.flink.core.fs.local; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.cor
(flink) branch master updated: [FLINK-35217] Add missing fsync to #closeForCommit in some subclasses of RecoverableFsDataOutputStream. (#24722)
This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 80af4d50231 [FLINK-35217] Add missing fsync to #closeForCommit in some subclasses of RecoverableFsDataOutputStream. (#24722) 80af4d50231 is described below commit 80af4d502318348ba15a8f75a2a622ce9dbdc968 Author: Stefan Richter AuthorDate: Tue Apr 30 13:36:11 2024 +0200 [FLINK-35217] Add missing fsync to #closeForCommit in some subclasses of RecoverableFsDataOutputStream. (#24722) --- ...erFromPersistRecoverableFsDataOutputStream.java | 59 +++ .../local/LocalRecoverableFsDataOutputStream.java | 23 ++- .../AbstractRecoverableFsDataOutputStreamTest.java | 97 +++ .../LocalRecoverableFsDataOutputStreamTest.java| 188 + .../AzureBlobFsRecoverableDataOutputStream.java| 17 +- ...AzureBlobFsRecoverableDataOutputStreamTest.java | 100 +++ .../BaseHadoopFsRecoverableFsDataOutputStream.java | 12 +- .../hdfs/HadoopRecoverableFsDataOutputStream.java | 17 +- .../HadoopRecoverableFsDataOutputStreamTest.java | 89 ++ 9 files changed, 578 insertions(+), 24 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/CommitterFromPersistRecoverableFsDataOutputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/CommitterFromPersistRecoverableFsDataOutputStream.java new file mode 100644 index 000..12e3fbc7f4d --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/fs/CommitterFromPersistRecoverableFsDataOutputStream.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import java.io.IOException; + +/** + * {@link RecoverableFsDataOutputStream} with fixed implementation of {@link #closeForCommit()} that + * is based on using {@link #persist()} to ensure durability and creates the {@link + * org.apache.flink.core.fs.RecoverableFsDataOutputStream.Committer} from the corresponding {@link + * org.apache.flink.core.fs.RecoverableWriter.ResumeRecoverable}. + * + * @param return type of #persist() + */ +public abstract class CommitterFromPersistRecoverableFsDataOutputStream< +RESUME_RECOVERABLE extends RecoverableWriter.ResumeRecoverable> +extends RecoverableFsDataOutputStream { + +/** @see RecoverableFsDataOutputStream#persist() */ +@Override +public abstract RESUME_RECOVERABLE persist() throws IOException; + +/** + * @see RecoverableFsDataOutputStream#closeForCommit() + * @param recoverable a resume recoverable to create the committer from. Typically the parameter + * is the return value of {@link #persist()}. + * @return the committer created from recoverable. + */ +protected abstract Committer createCommitterFromResumeRecoverable( +RESUME_RECOVERABLE recoverable); + +/** + * @see RecoverableFsDataOutputStream#closeForCommit() + * @implNote Calls persist to ensure durability of the written data and creates a committer + * object from the return value of {@link #persist()}. + */ +@Override +public final Committer closeForCommit() throws IOException { +Committer committer = createCommitterFromResumeRecoverable(persist()); +close(); +return committer; +} +} diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java index cc9c88fc4f4..c273c31960e 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java @@ -19,9 +19,10 @@ package org.apache.flink.core.fs.local; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.core.fs.CommitterFromPersistRecoverableFsDataOutputStream; import org.apache.flink.core.fs.RecoverableFsDataOu
(flink) branch master updated: [FLINK-34902][table] Fix IndexOutOfBoundsException for VALUES
This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 8a75f8f22b8 [FLINK-34902][table] Fix IndexOutOfBoundsException for VALUES 8a75f8f22b8 is described below commit 8a75f8f22b8e08e45f1a6453b87718eab6db115a Author: Jeyhun Karimov AuthorDate: Tue Apr 30 10:51:37 2024 +0200 [FLINK-34902][table] Fix IndexOutOfBoundsException for VALUES This closes #24724. --- .../planner/calcite/PreValidateReWriter.scala | 8 ++- .../calcite/FlinkCalciteSqlValidatorTest.java | 62 +- 2 files changed, 66 insertions(+), 4 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala index 985f252e406..5433733d140 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala @@ -241,8 +241,12 @@ object PreValidateReWriter { } rewriteSelect(validator, sqlSelect, targetRowType, assignedFields, targetPosition) case SqlKind.VALUES => -if (targetPosition.nonEmpty && call.operandCount() != targetPosition.size()) { - throw newValidationError(call, RESOURCE.columnCountMismatch()) +call.getOperandList.toSeq.foreach { + case sqlCall: SqlCall => { +if (targetPosition.nonEmpty && sqlCall.getOperandList.size() != targetPosition.size()) { + throw newValidationError(call, RESOURCE.columnCountMismatch()) +} + } } rewriteValues(call, targetRowType, assignedFields, targetPosition) case kind if SqlKind.SET_QUERY.contains(kind) => diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidatorTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidatorTest.java index 6485217dc26..bca80828e5b 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidatorTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidatorTest.java @@ -26,6 +26,7 @@ import org.apache.flink.table.planner.utils.PlannerMocks; import org.junit.jupiter.api.Test; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; /** Test for {@link FlinkCalciteSqlValidator}. */ class FlinkCalciteSqlValidatorTest { @@ -50,19 +51,76 @@ class FlinkCalciteSqlValidatorTest { } @Test -void testInsertInto1() { +void testInsertIntoShouldColumnMismatchWithValues() { assertThatThrownBy(() -> plannerMocks.getParser().parse("INSERT INTO t2 (a,b) VALUES(1)")) .isInstanceOf(ValidationException.class) .hasMessageContaining(" Number of columns must match number of query columns"); } @Test -void testInsertInto2() { +void testInsertIntoShouldColumnMismatchWithSelect() { assertThatThrownBy(() -> plannerMocks.getParser().parse("INSERT INTO t2 (a,b) SELECT 1")) .isInstanceOf(ValidationException.class) .hasMessageContaining(" Number of columns must match number of query columns"); } +@Test +void testInsertIntoShouldColumnMismatchWithLastValue() { +assertThatThrownBy( +() -> +plannerMocks +.getParser() +.parse("INSERT INTO t2 (a,b) VALUES (1,2), (3)")) +.isInstanceOf(ValidationException.class) +.hasMessageContaining(" Number of columns must match number of query columns"); +} + +@Test +void testInsertIntoShouldColumnMismatchWithFirstValue() { +assertThatThrownBy( +() -> +plannerMocks +.getParser() +.parse("INSERT INTO t2 (a,b) VALUES (1), (2,3)")) +.isInstanceOf(ValidationException.class) +.hasMessageContaining(" Number of columns must match number of query columns"); +} + +@Test +void testInsertIntoShouldColumnMismatchWithMultiFieldValues() { +assertThatThrownBy( +() -> +plannerMocks +.getParser() +