(flink) branch release-1.18 updated: [FLINK-35217] Add missing fsync to #closeForCommit in some subclasses of RecoverableFsDataOutputStream. (#24722) (#24752)

2024-04-30 Thread srichter
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.18 by this push:
 new e6726d3b962 [FLINK-35217] Add missing fsync to #closeForCommit in some 
subclasses of RecoverableFsDataOutputStream. (#24722) (#24752)
e6726d3b962 is described below

commit e6726d3b962383d9a2576fe117d7566b205f514a
Author: Stefan Richter 
AuthorDate: Tue Apr 30 22:25:38 2024 +0200

[FLINK-35217] Add missing fsync to #closeForCommit in some subclasses of 
RecoverableFsDataOutputStream. (#24722) (#24752)

(cherry picked from commit 80af4d502318348ba15a8f75a2a622ce9dbdc968)
---
 ...erFromPersistRecoverableFsDataOutputStream.java |  59 +++
 .../local/LocalRecoverableFsDataOutputStream.java  |  23 ++-
 .../AbstractRecoverableFsDataOutputStreamTest.java |  98 +++
 .../LocalRecoverableFsDataOutputStreamTest.java| 188 +
 .../AzureBlobFsRecoverableDataOutputStream.java|  17 +-
 ...AzureBlobFsRecoverableDataOutputStreamTest.java | 100 +++
 .../BaseHadoopFsRecoverableFsDataOutputStream.java |  12 +-
 .../hdfs/HadoopRecoverableFsDataOutputStream.java  |  17 +-
 .../HadoopRecoverableFsDataOutputStreamTest.java   |  89 ++
 9 files changed, 579 insertions(+), 24 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/core/fs/CommitterFromPersistRecoverableFsDataOutputStream.java
 
b/flink-core/src/main/java/org/apache/flink/core/fs/CommitterFromPersistRecoverableFsDataOutputStream.java
new file mode 100644
index 000..12e3fbc7f4d
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/core/fs/CommitterFromPersistRecoverableFsDataOutputStream.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import java.io.IOException;
+
+/**
+ * {@link RecoverableFsDataOutputStream} with fixed implementation of {@link 
#closeForCommit()} that
+ * is based on using {@link #persist()} to ensure durability and creates the 
{@link
+ * org.apache.flink.core.fs.RecoverableFsDataOutputStream.Committer} from the 
corresponding {@link
+ * org.apache.flink.core.fs.RecoverableWriter.ResumeRecoverable}.
+ *
+ * @param  return type of #persist()
+ */
+public abstract class CommitterFromPersistRecoverableFsDataOutputStream<
+RESUME_RECOVERABLE extends RecoverableWriter.ResumeRecoverable>
+extends RecoverableFsDataOutputStream {
+
+/** @see RecoverableFsDataOutputStream#persist() */
+@Override
+public abstract RESUME_RECOVERABLE persist() throws IOException;
+
+/**
+ * @see RecoverableFsDataOutputStream#closeForCommit()
+ * @param recoverable a resume recoverable to create the committer from. 
Typically the parameter
+ * is the return value of {@link #persist()}.
+ * @return the committer created from recoverable.
+ */
+protected abstract Committer createCommitterFromResumeRecoverable(
+RESUME_RECOVERABLE recoverable);
+
+/**
+ * @see RecoverableFsDataOutputStream#closeForCommit()
+ * @implNote Calls persist to ensure durability of the written data and 
creates a committer
+ * object from the return value of {@link #persist()}.
+ */
+@Override
+public final Committer closeForCommit() throws IOException {
+Committer committer = createCommitterFromResumeRecoverable(persist());
+close();
+return committer;
+}
+}
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java
 
b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java
index cc9c88fc4f4..c273c31960e 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java
@@ -19,9 +19,10 @@
 package org.apache.flink.core.fs.local;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import 
org.apache.flink.cor

(flink) branch release-1.19 updated: [FLINK-35217] Add missing fsync to #closeForCommit in some subclasses of RecoverableFsDataOutputStream. (#24722) (#24751)

2024-04-30 Thread srichter
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch release-1.19
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.19 by this push:
 new ac4aa35c6e2 [FLINK-35217] Add missing fsync to #closeForCommit in some 
subclasses of RecoverableFsDataOutputStream. (#24722) (#24751)
ac4aa35c6e2 is described below

commit ac4aa35c6e2e2da87760ffbf45d85888b1976c2f
Author: Stefan Richter 
AuthorDate: Tue Apr 30 22:25:27 2024 +0200

[FLINK-35217] Add missing fsync to #closeForCommit in some subclasses of 
RecoverableFsDataOutputStream. (#24722) (#24751)

(cherry picked from commit 80af4d502318348ba15a8f75a2a622ce9dbdc968)
---
 ...erFromPersistRecoverableFsDataOutputStream.java |  59 +++
 .../local/LocalRecoverableFsDataOutputStream.java  |  23 ++-
 .../AbstractRecoverableFsDataOutputStreamTest.java |  98 +++
 .../LocalRecoverableFsDataOutputStreamTest.java| 188 +
 .../AzureBlobFsRecoverableDataOutputStream.java|  17 +-
 ...AzureBlobFsRecoverableDataOutputStreamTest.java | 100 +++
 .../BaseHadoopFsRecoverableFsDataOutputStream.java |  12 +-
 .../hdfs/HadoopRecoverableFsDataOutputStream.java  |  17 +-
 .../HadoopRecoverableFsDataOutputStreamTest.java   |  89 ++
 9 files changed, 579 insertions(+), 24 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/core/fs/CommitterFromPersistRecoverableFsDataOutputStream.java
 
b/flink-core/src/main/java/org/apache/flink/core/fs/CommitterFromPersistRecoverableFsDataOutputStream.java
new file mode 100644
index 000..12e3fbc7f4d
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/core/fs/CommitterFromPersistRecoverableFsDataOutputStream.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import java.io.IOException;
+
+/**
+ * {@link RecoverableFsDataOutputStream} with fixed implementation of {@link 
#closeForCommit()} that
+ * is based on using {@link #persist()} to ensure durability and creates the 
{@link
+ * org.apache.flink.core.fs.RecoverableFsDataOutputStream.Committer} from the 
corresponding {@link
+ * org.apache.flink.core.fs.RecoverableWriter.ResumeRecoverable}.
+ *
+ * @param  return type of #persist()
+ */
+public abstract class CommitterFromPersistRecoverableFsDataOutputStream<
+RESUME_RECOVERABLE extends RecoverableWriter.ResumeRecoverable>
+extends RecoverableFsDataOutputStream {
+
+/** @see RecoverableFsDataOutputStream#persist() */
+@Override
+public abstract RESUME_RECOVERABLE persist() throws IOException;
+
+/**
+ * @see RecoverableFsDataOutputStream#closeForCommit()
+ * @param recoverable a resume recoverable to create the committer from. 
Typically the parameter
+ * is the return value of {@link #persist()}.
+ * @return the committer created from recoverable.
+ */
+protected abstract Committer createCommitterFromResumeRecoverable(
+RESUME_RECOVERABLE recoverable);
+
+/**
+ * @see RecoverableFsDataOutputStream#closeForCommit()
+ * @implNote Calls persist to ensure durability of the written data and 
creates a committer
+ * object from the return value of {@link #persist()}.
+ */
+@Override
+public final Committer closeForCommit() throws IOException {
+Committer committer = createCommitterFromResumeRecoverable(persist());
+close();
+return committer;
+}
+}
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java
 
b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java
index cc9c88fc4f4..c273c31960e 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java
@@ -19,9 +19,10 @@
 package org.apache.flink.core.fs.local;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import 
org.apache.flink.cor

(flink) branch master updated: [FLINK-35217] Add missing fsync to #closeForCommit in some subclasses of RecoverableFsDataOutputStream. (#24722)

2024-04-30 Thread srichter
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 80af4d50231 [FLINK-35217] Add missing fsync to #closeForCommit in some 
subclasses of RecoverableFsDataOutputStream. (#24722)
80af4d50231 is described below

commit 80af4d502318348ba15a8f75a2a622ce9dbdc968
Author: Stefan Richter 
AuthorDate: Tue Apr 30 13:36:11 2024 +0200

[FLINK-35217] Add missing fsync to #closeForCommit in some subclasses of 
RecoverableFsDataOutputStream. (#24722)
---
 ...erFromPersistRecoverableFsDataOutputStream.java |  59 +++
 .../local/LocalRecoverableFsDataOutputStream.java  |  23 ++-
 .../AbstractRecoverableFsDataOutputStreamTest.java |  97 +++
 .../LocalRecoverableFsDataOutputStreamTest.java| 188 +
 .../AzureBlobFsRecoverableDataOutputStream.java|  17 +-
 ...AzureBlobFsRecoverableDataOutputStreamTest.java | 100 +++
 .../BaseHadoopFsRecoverableFsDataOutputStream.java |  12 +-
 .../hdfs/HadoopRecoverableFsDataOutputStream.java  |  17 +-
 .../HadoopRecoverableFsDataOutputStreamTest.java   |  89 ++
 9 files changed, 578 insertions(+), 24 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/core/fs/CommitterFromPersistRecoverableFsDataOutputStream.java
 
b/flink-core/src/main/java/org/apache/flink/core/fs/CommitterFromPersistRecoverableFsDataOutputStream.java
new file mode 100644
index 000..12e3fbc7f4d
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/core/fs/CommitterFromPersistRecoverableFsDataOutputStream.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import java.io.IOException;
+
+/**
+ * {@link RecoverableFsDataOutputStream} with fixed implementation of {@link 
#closeForCommit()} that
+ * is based on using {@link #persist()} to ensure durability and creates the 
{@link
+ * org.apache.flink.core.fs.RecoverableFsDataOutputStream.Committer} from the 
corresponding {@link
+ * org.apache.flink.core.fs.RecoverableWriter.ResumeRecoverable}.
+ *
+ * @param  return type of #persist()
+ */
+public abstract class CommitterFromPersistRecoverableFsDataOutputStream<
+RESUME_RECOVERABLE extends RecoverableWriter.ResumeRecoverable>
+extends RecoverableFsDataOutputStream {
+
+/** @see RecoverableFsDataOutputStream#persist() */
+@Override
+public abstract RESUME_RECOVERABLE persist() throws IOException;
+
+/**
+ * @see RecoverableFsDataOutputStream#closeForCommit()
+ * @param recoverable a resume recoverable to create the committer from. 
Typically the parameter
+ * is the return value of {@link #persist()}.
+ * @return the committer created from recoverable.
+ */
+protected abstract Committer createCommitterFromResumeRecoverable(
+RESUME_RECOVERABLE recoverable);
+
+/**
+ * @see RecoverableFsDataOutputStream#closeForCommit()
+ * @implNote Calls persist to ensure durability of the written data and 
creates a committer
+ * object from the return value of {@link #persist()}.
+ */
+@Override
+public final Committer closeForCommit() throws IOException {
+Committer committer = createCommitterFromResumeRecoverable(persist());
+close();
+return committer;
+}
+}
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java
 
b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java
index cc9c88fc4f4..c273c31960e 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java
@@ -19,9 +19,10 @@
 package org.apache.flink.core.fs.local;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import 
org.apache.flink.core.fs.CommitterFromPersistRecoverableFsDataOutputStream;
 import org.apache.flink.core.fs.RecoverableFsDataOu

(flink) branch master updated: [FLINK-34902][table] Fix IndexOutOfBoundsException for VALUES

2024-04-30 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 8a75f8f22b8 [FLINK-34902][table] Fix IndexOutOfBoundsException for 
VALUES
8a75f8f22b8 is described below

commit 8a75f8f22b8e08e45f1a6453b87718eab6db115a
Author: Jeyhun Karimov 
AuthorDate: Tue Apr 30 10:51:37 2024 +0200

[FLINK-34902][table] Fix IndexOutOfBoundsException for VALUES

This closes #24724.
---
 .../planner/calcite/PreValidateReWriter.scala  |  8 ++-
 .../calcite/FlinkCalciteSqlValidatorTest.java  | 62 +-
 2 files changed, 66 insertions(+), 4 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
index 985f252e406..5433733d140 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
@@ -241,8 +241,12 @@ object PreValidateReWriter {
 }
 rewriteSelect(validator, sqlSelect, targetRowType, assignedFields, 
targetPosition)
   case SqlKind.VALUES =>
-if (targetPosition.nonEmpty && call.operandCount() != 
targetPosition.size()) {
-  throw newValidationError(call, RESOURCE.columnCountMismatch())
+call.getOperandList.toSeq.foreach {
+  case sqlCall: SqlCall => {
+if (targetPosition.nonEmpty && sqlCall.getOperandList.size() != 
targetPosition.size()) {
+  throw newValidationError(call, RESOURCE.columnCountMismatch())
+}
+  }
 }
 rewriteValues(call, targetRowType, assignedFields, targetPosition)
   case kind if SqlKind.SET_QUERY.contains(kind) =>
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidatorTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidatorTest.java
index 6485217dc26..bca80828e5b 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidatorTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidatorTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.planner.utils.PlannerMocks;
 import org.junit.jupiter.api.Test;
 
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 
 /** Test for {@link FlinkCalciteSqlValidator}. */
 class FlinkCalciteSqlValidatorTest {
@@ -50,19 +51,76 @@ class FlinkCalciteSqlValidatorTest {
 }
 
 @Test
-void testInsertInto1() {
+void testInsertIntoShouldColumnMismatchWithValues() {
 assertThatThrownBy(() -> plannerMocks.getParser().parse("INSERT INTO 
t2 (a,b) VALUES(1)"))
 .isInstanceOf(ValidationException.class)
 .hasMessageContaining(" Number of columns must match number of 
query columns");
 }
 
 @Test
-void testInsertInto2() {
+void testInsertIntoShouldColumnMismatchWithSelect() {
 assertThatThrownBy(() -> plannerMocks.getParser().parse("INSERT INTO 
t2 (a,b) SELECT 1"))
 .isInstanceOf(ValidationException.class)
 .hasMessageContaining(" Number of columns must match number of 
query columns");
 }
 
+@Test
+void testInsertIntoShouldColumnMismatchWithLastValue() {
+assertThatThrownBy(
+() ->
+plannerMocks
+.getParser()
+.parse("INSERT INTO t2 (a,b) VALUES 
(1,2), (3)"))
+.isInstanceOf(ValidationException.class)
+.hasMessageContaining(" Number of columns must match number of 
query columns");
+}
+
+@Test
+void testInsertIntoShouldColumnMismatchWithFirstValue() {
+assertThatThrownBy(
+() ->
+plannerMocks
+.getParser()
+.parse("INSERT INTO t2 (a,b) VALUES 
(1), (2,3)"))
+.isInstanceOf(ValidationException.class)
+.hasMessageContaining(" Number of columns must match number of 
query columns");
+}
+
+@Test
+void testInsertIntoShouldColumnMismatchWithMultiFieldValues() {
+assertThatThrownBy(
+() ->
+plannerMocks
+.getParser()
+