Re: [PR] [FLINK-33375] Implement restore test base [flink]

2023-10-28 Thread via GitHub


dawidwys closed pull request #23603: [FLINK-33375] Implement restore test base
URL: https://github.com/apache/flink/pull/23603


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33375] Implement restore test base [flink]

2023-10-27 Thread via GitHub


dawidwys commented on PR #23603:
URL: https://github.com/apache/flink/pull/23603#issuecomment-1783097478

   I'll squash all commits that do not start with `[xxx]` once the CI is green 
and the PR is approved.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33375] Implement restore test base [flink]

2023-10-27 Thread via GitHub


dawidwys commented on code in PR #23603:
URL: https://github.com/apache/flink/pull/23603#discussion_r1374631884


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/CalcTestProgram.java:
##
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.testutils;
+
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+
+/** {@link TableTestProgram} definitions for testing {@link StreamExecCalc}. */
+public class CalcTestProgram {
+
+static final TableTestProgram SIMPLE_CALC =
+TableTestProgram.of("simple-calc", "Simple calc with sources and 
sinks")
+.runSql("INSERT INTO sink_t SELECT a + 1, b FROM t")
+.setupTableSource("t")
+.withSchema("a BIGINT", "b DOUBLE")
+.withValuesBeforeRestore(Row.of(420L, 42.0))
+.withValuesAfterRestore(Row.of(421L, 42.1))
+.complete()
+.setupTableSink("sink_t")
+.withSchema("a BIGINT", "b DOUBLE")
+.withValuesBeforeRestore(Row.of(421L, 42.0))
+.withValuesAfterRestore(Row.of(421L, 42.0), Row.of(422L, 
42.1))

Review Comment:
   sure, I'll do



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33375] Implement restore test base [flink]

2023-10-27 Thread via GitHub


dawidwys commented on code in PR #23603:
URL: https://github.com/apache/flink/pull/23603#discussion_r1374632320


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java:
##
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.testutils;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.table.api.CompiledPlan;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.PlanReference;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
+import org.apache.flink.table.planner.plan.utils.ExecNodeMetadataUtil;
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.SqlTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.table.test.program.TableTestProgramRunner;
+import org.apache.flink.table.test.program.TestStep.TestKind;
+import org.apache.flink.types.Row;
+
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Base class for implementing restore tests for {@link ExecNode}.You can 
generate json compiled
+ * plan and a savepoint for the latest node version by running {@link
+ * RestoreTestBase#generateTestSetupFiles(TableTestProgram)} which is disabled 
by default.
+ */
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public abstract class RestoreTestBase implements TableTestProgramRunner {
+
+private final Class execNodeUnderTest;
+
+protected RestoreTestBase(Class execNodeUnderTest) {
+this.execNodeUnderTest = execNodeUnderTest;
+}
+
+@Override
+public EnumSet supportedSetupSteps() {
+return EnumSet.of(TestKind.SOURCE_WITH_RESTORE_DATA, 
TestKind.SINK_WITH_RESTORE_DATA);
+}
+
+@Override
+public EnumSet supportedRunSteps() {
+return EnumSet.of(TestKind.SQL);
+}
+
+private @TempDir Path tmpDir;
+
+private List getVersions() {
+return 
ExecNodeMetadataUtil.extractMetadataFromAnnotation(execNodeUnderTest).stream()
+.map(ExecNodeMetadata::version)
+.collect(Collectors.toList());
+}
+
+private int getCurrentVersion() {
+return 
ExecNodeMetadataUtil.latestAnnotation(execNodeUnderTest).version();
+}
+
+private Stream createSpecs() {
+return getVersions().stream()
+.flatMap(
+version -> supportedPrograms().stream().map(p -> 
Arguments.of(version, p)));
+}
+
+/**
+ * Execute this test to generate test files. Remember to be using the 
correct branch when
+ * generating the test files.
+ */
+@Disabled
+@ParameterizedTest
+@MethodSource("supportedPrograms")
+

Re: [PR] [FLINK-33375] Implement restore test base [flink]

2023-10-27 Thread via GitHub


twalthr commented on code in PR #23603:
URL: https://github.com/apache/flink/pull/23603#discussion_r1374547838


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java:
##
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.testutils;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.table.api.CompiledPlan;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.PlanReference;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
+import org.apache.flink.table.planner.plan.utils.ExecNodeMetadataUtil;
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.SqlTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.table.test.program.TableTestProgramRunner;
+import org.apache.flink.table.test.program.TestStep.TestKind;
+import org.apache.flink.types.Row;
+
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Base class for implementing restore tests for {@link ExecNode}.You can 
generate json compiled
+ * plan and a savepoint for the latest node version by running {@link
+ * RestoreTestBase#generateTestSetupFiles(TableTestProgram)} which is disabled 
by default.
+ */
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public abstract class RestoreTestBase implements TableTestProgramRunner {
+
+private final Class execNodeUnderTest;
+
+protected RestoreTestBase(Class execNodeUnderTest) {
+this.execNodeUnderTest = execNodeUnderTest;
+}
+
+@Override
+public EnumSet supportedSetupSteps() {
+return EnumSet.of(TestKind.SOURCE_WITH_RESTORE_DATA, 
TestKind.SINK_WITH_RESTORE_DATA);
+}
+
+@Override
+public EnumSet supportedRunSteps() {
+return EnumSet.of(TestKind.SQL);
+}
+
+private @TempDir Path tmpDir;
+
+private List getVersions() {
+return 
ExecNodeMetadataUtil.extractMetadataFromAnnotation(execNodeUnderTest).stream()
+.map(ExecNodeMetadata::version)
+.collect(Collectors.toList());
+}
+
+private int getCurrentVersion() {
+return 
ExecNodeMetadataUtil.latestAnnotation(execNodeUnderTest).version();
+}
+
+private Stream createSpecs() {
+return getVersions().stream()
+.flatMap(
+version -> supportedPrograms().stream().map(p -> 
Arguments.of(version, p)));
+}
+
+/**
+ * Execute this test to generate test files. Remember to be using the 
correct branch when
+ * generating the test files.
+ */
+@Disabled
+@ParameterizedTest
+@MethodSource("supportedPrograms")
+

Re: [PR] [FLINK-33375] Implement restore test base [flink]

2023-10-27 Thread via GitHub


twalthr commented on code in PR #23603:
URL: https://github.com/apache/flink/pull/23603#discussion_r1374546134


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/CalcTestProgram.java:
##
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.testutils;
+
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+
+/** {@link TableTestProgram} definitions for testing {@link StreamExecCalc}. */
+public class CalcTestProgram {
+
+static final TableTestProgram SIMPLE_CALC =
+TableTestProgram.of("simple-calc", "Simple calc with sources and 
sinks")
+.runSql("INSERT INTO sink_t SELECT a + 1, b FROM t")
+.setupTableSource("t")
+.withSchema("a BIGINT", "b DOUBLE")
+.withValuesBeforeRestore(Row.of(420L, 42.0))
+.withValuesAfterRestore(Row.of(421L, 42.1))
+.complete()
+.setupTableSink("sink_t")
+.withSchema("a BIGINT", "b DOUBLE")
+.withValuesBeforeRestore(Row.of(421L, 42.0))
+.withValuesAfterRestore(Row.of(421L, 42.0), Row.of(422L, 
42.1))

Review Comment:
   But could we implicitly add the other list to the restore list. Defining the 
same data twice does not improve readability. We can also add information to 
the JavaDoc of the builder to make this clearer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33375] Implement restore test base [flink]

2023-10-27 Thread via GitHub


dawidwys commented on code in PR #23603:
URL: https://github.com/apache/flink/pull/23603#discussion_r1374485571


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java:
##
@@ -613,8 +610,13 @@ public void writeRecord(RowData value) throws IOException {
 assertThat(row).isNotNull();
 synchronized (LOCK) {
 localRawResult.add(row);
-notifyRawResultsObservers(
-tableName, 
getRuntimeContext().getIndexOfThisSubtask(), localRawResult);
+
Optional.ofNullable(localRawResultsObservers.get(tableName))
+.orElse(Collections.emptyList())
+.forEach(
+c ->
+c.accept(
+
getRuntimeContext().getIndexOfThisSubtask(),
+localRawResult));

Review Comment:
   This class does not have a common parent with the 3 above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33375] Implement restore test base [flink]

2023-10-27 Thread via GitHub


dawidwys commented on code in PR #23603:
URL: https://github.com/apache/flink/pull/23603#discussion_r1374466579


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java:
##
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.testutils;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.table.api.CompiledPlan;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.PlanReference;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
+import org.apache.flink.table.planner.plan.utils.ExecNodeMetadataUtil;
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.SqlTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.table.test.program.TableTestProgramRunner;
+import org.apache.flink.table.test.program.TestStep.TestKind;
+import org.apache.flink.types.Row;
+
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Base class for implementing restore tests for {@link ExecNode}.You can 
generate json compiled
+ * plan and a savepoint for the latest node version by running {@link
+ * RestoreTestBase#generateTestSetupFiles(TableTestProgram)} which is disabled 
by default.
+ */
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public abstract class RestoreTestBase implements TableTestProgramRunner {

Review Comment:
   I used `@ExtendWith(MiniClusterExtension.class)` which is junit 5 equivalent



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33375] Implement restore test base [flink]

2023-10-27 Thread via GitHub


dawidwys commented on code in PR #23603:
URL: https://github.com/apache/flink/pull/23603#discussion_r1374442591


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/CalcTestProgram.java:
##
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.testutils;
+
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+
+/** {@link TableTestProgram} definitions for testing {@link StreamExecCalc}. */
+public class CalcTestProgram {
+
+static final TableTestProgram SIMPLE_CALC =
+TableTestProgram.of("simple-calc", "Simple calc with sources and 
sinks")
+.runSql("INSERT INTO sink_t SELECT a + 1, b FROM t")
+.setupTableSource("t")
+.withSchema("a BIGINT", "b DOUBLE")
+.withValuesBeforeRestore(Row.of(420L, 42.0))
+.withValuesAfterRestore(Row.of(421L, 42.1))
+.complete()
+.setupTableSink("sink_t")
+.withSchema("a BIGINT", "b DOUBLE")
+.withValuesBeforeRestore(Row.of(421L, 42.0))
+.withValuesAfterRestore(Row.of(421L, 42.0), Row.of(422L, 
42.1))

Review Comment:
   It should. The sink is implemented in a way that stores all seen records in 
the state. I believe this is also helpful, because it actually checks the 
savepoint has been restored.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33375] Implement restore test base [flink]

2023-10-27 Thread via GitHub


dawidwys commented on code in PR #23603:
URL: https://github.com/apache/flink/pull/23603#discussion_r1374420759


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java:
##
@@ -879,6 +890,7 @@ private TestValuesScanTableSourceWithoutProjectionPushDown(
 DataType producedDataType,
 ChangelogMode changelogMode,
 boolean bounded,
+boolean finite,

Review Comment:
   No it was to be consistent with bounded. I did not want to change the class 
too much. I'll do that though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33375] Implement restore test base [flink]

2023-10-27 Thread via GitHub


dawidwys commented on code in PR #23603:
URL: https://github.com/apache/flink/pull/23603#discussion_r1374420333


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java:
##
@@ -981,16 +998,25 @@ public boolean isBounded() {
 case "NewSource":
 checkArgument(
 !failingSource, "Values Source doesn't support as 
failing new source.");
+final FinishingLogic finishingLogic =
+finite ? FinishingLogic.FINITE : 
FinishingLogic.INFINITE;
+final Boundedness boundedness =

Review Comment:
   added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33375] Implement restore test base [flink]

2023-10-27 Thread via GitHub


dawidwys commented on code in PR #23603:
URL: https://github.com/apache/flink/pull/23603#discussion_r1374384507


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java:
##
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.testutils;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.table.api.CompiledPlan;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.PlanReference;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
+import org.apache.flink.table.planner.plan.utils.ExecNodeMetadataUtil;
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.SqlTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.table.test.program.TableTestProgramRunner;
+import org.apache.flink.table.test.program.TestStep.TestKind;
+import org.apache.flink.types.Row;
+
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Base class for implementing restore tests for {@link ExecNode}.You can 
generate json compiled
+ * plan and a savepoint for the latest node version by running {@link
+ * RestoreTestBase#generateTestSetupFiles(TableTestProgram)} which is disabled 
by default.
+ */
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public abstract class RestoreTestBase implements TableTestProgramRunner {
+
+private final Class execNodeUnderTest;
+
+protected RestoreTestBase(Class execNodeUnderTest) {
+this.execNodeUnderTest = execNodeUnderTest;
+}
+
+@Override
+public EnumSet supportedSetupSteps() {
+return EnumSet.of(TestKind.SOURCE_WITH_RESTORE_DATA, 
TestKind.SINK_WITH_RESTORE_DATA);
+}
+
+@Override
+public EnumSet supportedRunSteps() {
+return EnumSet.of(TestKind.SQL);
+}
+
+private @TempDir Path tmpDir;
+
+private List getVersions() {
+return 
ExecNodeMetadataUtil.extractMetadataFromAnnotation(execNodeUnderTest).stream()
+.map(ExecNodeMetadata::version)
+.collect(Collectors.toList());
+}
+
+private int getCurrentVersion() {
+return 
ExecNodeMetadataUtil.latestAnnotation(execNodeUnderTest).version();
+}
+
+private Stream createSpecs() {
+return getVersions().stream()
+.flatMap(
+version -> supportedPrograms().stream().map(p -> 
Arguments.of(version, p)));
+}
+
+/**
+ * Execute this test to generate test files. Remember to be using the 
correct branch when
+ * generating the test files.
+ */
+@Disabled
+@ParameterizedTest
+@MethodSource("supportedPrograms")
+

Re: [PR] [FLINK-33375] Implement restore test base [flink]

2023-10-27 Thread via GitHub


twalthr commented on code in PR #23603:
URL: https://github.com/apache/flink/pull/23603#discussion_r1374365391


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/CalcTestProgram.java:
##
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.testutils;
+
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+
+/** {@link TableTestProgram} definitions for testing {@link StreamExecCalc}. */
+public class CalcTestProgram {

Review Comment:
   make it plural



##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/CalcTestProgram.java:
##
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.testutils;
+
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+
+/** {@link TableTestProgram} definitions for testing {@link StreamExecCalc}. */
+public class CalcTestProgram {
+
+static final TableTestProgram SIMPLE_CALC =
+TableTestProgram.of("simple-calc", "Simple calc with sources and 
sinks")
+.runSql("INSERT INTO sink_t SELECT a + 1, b FROM t")

Review Comment:
   nit: runSql at the end of the builder?



##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/CalcTestProgram.java:
##
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.testutils;
+
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+
+/** {@link TableTestProgram} definitions for testing {@link StreamExecCalc}. */
+public class CalcTestProgram {
+
+static final TableTestProgram SIMPLE_CALC =
+TableTestProgram.of("simple-calc", "Simple calc with sources and 
sinks")

Review Comment:
   follow the guidelines in `TableTestProgram.of` maybe we need automated 
checks for this :D



##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java:
##
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license 

Re: [PR] [FLINK-33375] Implement restore test base [flink]

2023-10-27 Thread via GitHub


twalthr commented on code in PR #23603:
URL: https://github.com/apache/flink/pull/23603#discussion_r1374362240


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java:
##
@@ -378,6 +395,8 @@ public void invoke(RowData value, Context context) throws 
Exception {
 }
 synchronized (LOCK) {
 localRawResult.add((Row) converter.toExternal(value));
+notifyRawResultsObservers(

Review Comment:
   method is called 4 times with the same args? maybe create a method in super 
class?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33375] Implement restore test base [flink]

2023-10-27 Thread via GitHub


twalthr commented on code in PR #23603:
URL: https://github.com/apache/flink/pull/23603#discussion_r1374350175


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java:
##
@@ -981,16 +998,25 @@ public boolean isBounded() {
 case "NewSource":
 checkArgument(
 !failingSource, "Values Source doesn't support as 
failing new source.");
+final FinishingLogic finishingLogic =
+finite ? FinishingLogic.FINITE : 
FinishingLogic.INFINITE;
+final Boundedness boundedness =

Review Comment:
   do we have a check somewhere that infinite + bounded makes no sense?



##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java:
##
@@ -879,6 +890,7 @@ private TestValuesScanTableSourceWithoutProjectionPushDown(
 DataType producedDataType,
 ChangelogMode changelogMode,
 boolean bounded,
+boolean finite,

Review Comment:
   Use the enum here as well? Also for bounded? Or is it due to the 
serialization?



##
flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/FinishingLogic.java:
##
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.source;
+
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+
+/**
+ * Tells sources created from {@link TestValuesTableFactory} how they should 
behave after producing
+ * all data.

Review Comment:
   This might be confusing for people. Explain that "even if the source is 
declared unbounded, we can internally shut it down"?



##
flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/split/ValuesSourcePartitionSplitSerializer.java:
##
@@ -68,7 +70,11 @@ public ValuesSourcePartitionSplit deserialize(int version, 
byte[] serialized)
 String value = in.readUTF();
 partition.put(key, value);
 }
-ValuesSourcePartitionSplit split = new 
ValuesSourcePartitionSplit(partition);
+final boolean isInfinite = in.readBoolean();
+ValuesSourcePartitionSplit split =
+new ValuesSourcePartitionSplit(
+partition,
+isInfinite ? FinishingLogic.INFINITE : 
FinishingLogic.FINITE);

Review Comment:
   We have a lot of these if checks. Maybe introduce a method in 
`FinishingLogic` serialization and deserialization?



##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java:
##
@@ -313,6 +315,9 @@ private static RowKind parseRowKind(String 
rowKindShortString) {
 private static final ConfigOption BOUNDED =
 ConfigOptions.key("bounded").booleanType().defaultValue(false);
 
+private static final ConfigOption FINITE =
+ConfigOptions.key("finite").booleanType().defaultValue(true);

Review Comment:
   I wonder if `terminating` would have been a better word. Because finite 
sounds like a synonym to bounded. Where terminating refers to the runtime 
behavior. But just a suggestion.



##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java:
##
@@ -313,6 +315,9 @@ private static RowKind parseRowKind(String 
rowKindShortString) {
 private static final ConfigOption BOUNDED =
 ConfigOptions.key("bounded").booleanType().defaultValue(false);
 
+private static final ConfigOption FINITE =
+ConfigOptions.key("finite").booleanType().defaultValue(true);

Review Comment:
   maybe add a description to the config option. esp explain finite vs bounded



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, 

Re: [PR] [FLINK-33375] Implement restore test base [flink]

2023-10-26 Thread via GitHub


flinkbot commented on PR #23603:
URL: https://github.com/apache/flink/pull/23603#issuecomment-1781372184

   
   ## CI report:
   
   * d8d49116263be7f4819fec276692ee59ef5f9003 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-33375] Implement restore test base [flink]

2023-10-26 Thread via GitHub


dawidwys opened a new pull request, #23603:
URL: https://github.com/apache/flink/pull/23603

   
   ## What is the purpose of the change
   
   This introduces a test base for writing restore tests.
   
   ## Verifying this change
   
   It contains a single test as an example for a simple calc.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't 
know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (**yes** / no)
 - If yes, how is the feature documented? (not applicable / docs / 
**JavaDocs** / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org