Re: [PR] [FLINK-33375] Implement restore test base [flink]
dawidwys closed pull request #23603: [FLINK-33375] Implement restore test base URL: https://github.com/apache/flink/pull/23603 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33375] Implement restore test base [flink]
dawidwys commented on PR #23603: URL: https://github.com/apache/flink/pull/23603#issuecomment-1783097478 I'll squash all commits that do not start with `[xxx]` once the CI is green and the PR is approved. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33375] Implement restore test base [flink]
dawidwys commented on code in PR #23603: URL: https://github.com/apache/flink/pull/23603#discussion_r1374631884 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/CalcTestProgram.java: ## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.testutils; + +import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc; +import org.apache.flink.table.test.program.TableTestProgram; +import org.apache.flink.types.Row; + +/** {@link TableTestProgram} definitions for testing {@link StreamExecCalc}. */ +public class CalcTestProgram { + +static final TableTestProgram SIMPLE_CALC = +TableTestProgram.of("simple-calc", "Simple calc with sources and sinks") +.runSql("INSERT INTO sink_t SELECT a + 1, b FROM t") +.setupTableSource("t") +.withSchema("a BIGINT", "b DOUBLE") +.withValuesBeforeRestore(Row.of(420L, 42.0)) +.withValuesAfterRestore(Row.of(421L, 42.1)) +.complete() +.setupTableSink("sink_t") +.withSchema("a BIGINT", "b DOUBLE") +.withValuesBeforeRestore(Row.of(421L, 42.0)) +.withValuesAfterRestore(Row.of(421L, 42.0), Row.of(422L, 42.1)) Review Comment: sure, I'll do -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33375] Implement restore test base [flink]
dawidwys commented on code in PR #23603: URL: https://github.com/apache/flink/pull/23603#discussion_r1374632320 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java: ## @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.testutils; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.core.execution.SavepointFormatType; +import org.apache.flink.runtime.jobgraph.RestoreMode; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.table.api.CompiledPlan; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.PlanReference; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.config.TableConfigOptions; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata; +import org.apache.flink.table.planner.plan.utils.ExecNodeMetadataUtil; +import org.apache.flink.table.test.program.SinkTestStep; +import org.apache.flink.table.test.program.SourceTestStep; +import org.apache.flink.table.test.program.SqlTestStep; +import org.apache.flink.table.test.program.TableTestProgram; +import org.apache.flink.table.test.program.TableTestProgramRunner; +import org.apache.flink.table.test.program.TestStep.TestKind; +import org.apache.flink.types.Row; + +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.net.URI; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Base class for implementing restore tests for {@link ExecNode}.You can generate json compiled + * plan and a savepoint for the latest node version by running {@link + * RestoreTestBase#generateTestSetupFiles(TableTestProgram)} which is disabled by default. + */ +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public abstract class RestoreTestBase implements TableTestProgramRunner { + +private final Class execNodeUnderTest; + +protected RestoreTestBase(Class execNodeUnderTest) { +this.execNodeUnderTest = execNodeUnderTest; +} + +@Override +public EnumSet supportedSetupSteps() { +return EnumSet.of(TestKind.SOURCE_WITH_RESTORE_DATA, TestKind.SINK_WITH_RESTORE_DATA); +} + +@Override +public EnumSet supportedRunSteps() { +return EnumSet.of(TestKind.SQL); +} + +private @TempDir Path tmpDir; + +private List getVersions() { +return ExecNodeMetadataUtil.extractMetadataFromAnnotation(execNodeUnderTest).stream() +.map(ExecNodeMetadata::version) +.collect(Collectors.toList()); +} + +private int getCurrentVersion() { +return ExecNodeMetadataUtil.latestAnnotation(execNodeUnderTest).version(); +} + +private Stream createSpecs() { +return getVersions().stream() +.flatMap( +version -> supportedPrograms().stream().map(p -> Arguments.of(version, p))); +} + +/** + * Execute this test to generate test files. Remember to be using the correct branch when + * generating the test files. + */ +@Disabled +@ParameterizedTest +@MethodSource("supportedPrograms") +
Re: [PR] [FLINK-33375] Implement restore test base [flink]
twalthr commented on code in PR #23603: URL: https://github.com/apache/flink/pull/23603#discussion_r1374547838 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java: ## @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.testutils; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.core.execution.SavepointFormatType; +import org.apache.flink.runtime.jobgraph.RestoreMode; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.table.api.CompiledPlan; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.PlanReference; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.config.TableConfigOptions; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata; +import org.apache.flink.table.planner.plan.utils.ExecNodeMetadataUtil; +import org.apache.flink.table.test.program.SinkTestStep; +import org.apache.flink.table.test.program.SourceTestStep; +import org.apache.flink.table.test.program.SqlTestStep; +import org.apache.flink.table.test.program.TableTestProgram; +import org.apache.flink.table.test.program.TableTestProgramRunner; +import org.apache.flink.table.test.program.TestStep.TestKind; +import org.apache.flink.types.Row; + +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.net.URI; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Base class for implementing restore tests for {@link ExecNode}.You can generate json compiled + * plan and a savepoint for the latest node version by running {@link + * RestoreTestBase#generateTestSetupFiles(TableTestProgram)} which is disabled by default. + */ +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public abstract class RestoreTestBase implements TableTestProgramRunner { + +private final Class execNodeUnderTest; + +protected RestoreTestBase(Class execNodeUnderTest) { +this.execNodeUnderTest = execNodeUnderTest; +} + +@Override +public EnumSet supportedSetupSteps() { +return EnumSet.of(TestKind.SOURCE_WITH_RESTORE_DATA, TestKind.SINK_WITH_RESTORE_DATA); +} + +@Override +public EnumSet supportedRunSteps() { +return EnumSet.of(TestKind.SQL); +} + +private @TempDir Path tmpDir; + +private List getVersions() { +return ExecNodeMetadataUtil.extractMetadataFromAnnotation(execNodeUnderTest).stream() +.map(ExecNodeMetadata::version) +.collect(Collectors.toList()); +} + +private int getCurrentVersion() { +return ExecNodeMetadataUtil.latestAnnotation(execNodeUnderTest).version(); +} + +private Stream createSpecs() { +return getVersions().stream() +.flatMap( +version -> supportedPrograms().stream().map(p -> Arguments.of(version, p))); +} + +/** + * Execute this test to generate test files. Remember to be using the correct branch when + * generating the test files. + */ +@Disabled +@ParameterizedTest +@MethodSource("supportedPrograms") +
Re: [PR] [FLINK-33375] Implement restore test base [flink]
twalthr commented on code in PR #23603: URL: https://github.com/apache/flink/pull/23603#discussion_r1374546134 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/CalcTestProgram.java: ## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.testutils; + +import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc; +import org.apache.flink.table.test.program.TableTestProgram; +import org.apache.flink.types.Row; + +/** {@link TableTestProgram} definitions for testing {@link StreamExecCalc}. */ +public class CalcTestProgram { + +static final TableTestProgram SIMPLE_CALC = +TableTestProgram.of("simple-calc", "Simple calc with sources and sinks") +.runSql("INSERT INTO sink_t SELECT a + 1, b FROM t") +.setupTableSource("t") +.withSchema("a BIGINT", "b DOUBLE") +.withValuesBeforeRestore(Row.of(420L, 42.0)) +.withValuesAfterRestore(Row.of(421L, 42.1)) +.complete() +.setupTableSink("sink_t") +.withSchema("a BIGINT", "b DOUBLE") +.withValuesBeforeRestore(Row.of(421L, 42.0)) +.withValuesAfterRestore(Row.of(421L, 42.0), Row.of(422L, 42.1)) Review Comment: But could we implicitly add the other list to the restore list. Defining the same data twice does not improve readability. We can also add information to the JavaDoc of the builder to make this clearer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33375] Implement restore test base [flink]
dawidwys commented on code in PR #23603: URL: https://github.com/apache/flink/pull/23603#discussion_r1374485571 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java: ## @@ -613,8 +610,13 @@ public void writeRecord(RowData value) throws IOException { assertThat(row).isNotNull(); synchronized (LOCK) { localRawResult.add(row); -notifyRawResultsObservers( -tableName, getRuntimeContext().getIndexOfThisSubtask(), localRawResult); + Optional.ofNullable(localRawResultsObservers.get(tableName)) +.orElse(Collections.emptyList()) +.forEach( +c -> +c.accept( + getRuntimeContext().getIndexOfThisSubtask(), +localRawResult)); Review Comment: This class does not have a common parent with the 3 above. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33375] Implement restore test base [flink]
dawidwys commented on code in PR #23603: URL: https://github.com/apache/flink/pull/23603#discussion_r1374466579 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java: ## @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.testutils; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.core.execution.SavepointFormatType; +import org.apache.flink.runtime.jobgraph.RestoreMode; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.table.api.CompiledPlan; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.PlanReference; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.config.TableConfigOptions; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata; +import org.apache.flink.table.planner.plan.utils.ExecNodeMetadataUtil; +import org.apache.flink.table.test.program.SinkTestStep; +import org.apache.flink.table.test.program.SourceTestStep; +import org.apache.flink.table.test.program.SqlTestStep; +import org.apache.flink.table.test.program.TableTestProgram; +import org.apache.flink.table.test.program.TableTestProgramRunner; +import org.apache.flink.table.test.program.TestStep.TestKind; +import org.apache.flink.types.Row; + +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.net.URI; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Base class for implementing restore tests for {@link ExecNode}.You can generate json compiled + * plan and a savepoint for the latest node version by running {@link + * RestoreTestBase#generateTestSetupFiles(TableTestProgram)} which is disabled by default. + */ +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public abstract class RestoreTestBase implements TableTestProgramRunner { Review Comment: I used `@ExtendWith(MiniClusterExtension.class)` which is junit 5 equivalent -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33375] Implement restore test base [flink]
dawidwys commented on code in PR #23603: URL: https://github.com/apache/flink/pull/23603#discussion_r1374442591 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/CalcTestProgram.java: ## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.testutils; + +import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc; +import org.apache.flink.table.test.program.TableTestProgram; +import org.apache.flink.types.Row; + +/** {@link TableTestProgram} definitions for testing {@link StreamExecCalc}. */ +public class CalcTestProgram { + +static final TableTestProgram SIMPLE_CALC = +TableTestProgram.of("simple-calc", "Simple calc with sources and sinks") +.runSql("INSERT INTO sink_t SELECT a + 1, b FROM t") +.setupTableSource("t") +.withSchema("a BIGINT", "b DOUBLE") +.withValuesBeforeRestore(Row.of(420L, 42.0)) +.withValuesAfterRestore(Row.of(421L, 42.1)) +.complete() +.setupTableSink("sink_t") +.withSchema("a BIGINT", "b DOUBLE") +.withValuesBeforeRestore(Row.of(421L, 42.0)) +.withValuesAfterRestore(Row.of(421L, 42.0), Row.of(422L, 42.1)) Review Comment: It should. The sink is implemented in a way that stores all seen records in the state. I believe this is also helpful, because it actually checks the savepoint has been restored. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33375] Implement restore test base [flink]
dawidwys commented on code in PR #23603: URL: https://github.com/apache/flink/pull/23603#discussion_r1374420759 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java: ## @@ -879,6 +890,7 @@ private TestValuesScanTableSourceWithoutProjectionPushDown( DataType producedDataType, ChangelogMode changelogMode, boolean bounded, +boolean finite, Review Comment: No it was to be consistent with bounded. I did not want to change the class too much. I'll do that though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33375] Implement restore test base [flink]
dawidwys commented on code in PR #23603: URL: https://github.com/apache/flink/pull/23603#discussion_r1374420333 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java: ## @@ -981,16 +998,25 @@ public boolean isBounded() { case "NewSource": checkArgument( !failingSource, "Values Source doesn't support as failing new source."); +final FinishingLogic finishingLogic = +finite ? FinishingLogic.FINITE : FinishingLogic.INFINITE; +final Boundedness boundedness = Review Comment: added -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33375] Implement restore test base [flink]
dawidwys commented on code in PR #23603: URL: https://github.com/apache/flink/pull/23603#discussion_r1374384507 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java: ## @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.testutils; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.core.execution.SavepointFormatType; +import org.apache.flink.runtime.jobgraph.RestoreMode; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.table.api.CompiledPlan; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.PlanReference; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.config.TableConfigOptions; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata; +import org.apache.flink.table.planner.plan.utils.ExecNodeMetadataUtil; +import org.apache.flink.table.test.program.SinkTestStep; +import org.apache.flink.table.test.program.SourceTestStep; +import org.apache.flink.table.test.program.SqlTestStep; +import org.apache.flink.table.test.program.TableTestProgram; +import org.apache.flink.table.test.program.TableTestProgramRunner; +import org.apache.flink.table.test.program.TestStep.TestKind; +import org.apache.flink.types.Row; + +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.net.URI; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Base class for implementing restore tests for {@link ExecNode}.You can generate json compiled + * plan and a savepoint for the latest node version by running {@link + * RestoreTestBase#generateTestSetupFiles(TableTestProgram)} which is disabled by default. + */ +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public abstract class RestoreTestBase implements TableTestProgramRunner { + +private final Class execNodeUnderTest; + +protected RestoreTestBase(Class execNodeUnderTest) { +this.execNodeUnderTest = execNodeUnderTest; +} + +@Override +public EnumSet supportedSetupSteps() { +return EnumSet.of(TestKind.SOURCE_WITH_RESTORE_DATA, TestKind.SINK_WITH_RESTORE_DATA); +} + +@Override +public EnumSet supportedRunSteps() { +return EnumSet.of(TestKind.SQL); +} + +private @TempDir Path tmpDir; + +private List getVersions() { +return ExecNodeMetadataUtil.extractMetadataFromAnnotation(execNodeUnderTest).stream() +.map(ExecNodeMetadata::version) +.collect(Collectors.toList()); +} + +private int getCurrentVersion() { +return ExecNodeMetadataUtil.latestAnnotation(execNodeUnderTest).version(); +} + +private Stream createSpecs() { +return getVersions().stream() +.flatMap( +version -> supportedPrograms().stream().map(p -> Arguments.of(version, p))); +} + +/** + * Execute this test to generate test files. Remember to be using the correct branch when + * generating the test files. + */ +@Disabled +@ParameterizedTest +@MethodSource("supportedPrograms") +
Re: [PR] [FLINK-33375] Implement restore test base [flink]
twalthr commented on code in PR #23603: URL: https://github.com/apache/flink/pull/23603#discussion_r1374365391 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/CalcTestProgram.java: ## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.testutils; + +import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc; +import org.apache.flink.table.test.program.TableTestProgram; +import org.apache.flink.types.Row; + +/** {@link TableTestProgram} definitions for testing {@link StreamExecCalc}. */ +public class CalcTestProgram { Review Comment: make it plural ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/CalcTestProgram.java: ## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.testutils; + +import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc; +import org.apache.flink.table.test.program.TableTestProgram; +import org.apache.flink.types.Row; + +/** {@link TableTestProgram} definitions for testing {@link StreamExecCalc}. */ +public class CalcTestProgram { + +static final TableTestProgram SIMPLE_CALC = +TableTestProgram.of("simple-calc", "Simple calc with sources and sinks") +.runSql("INSERT INTO sink_t SELECT a + 1, b FROM t") Review Comment: nit: runSql at the end of the builder? ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/CalcTestProgram.java: ## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.testutils; + +import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc; +import org.apache.flink.table.test.program.TableTestProgram; +import org.apache.flink.types.Row; + +/** {@link TableTestProgram} definitions for testing {@link StreamExecCalc}. */ +public class CalcTestProgram { + +static final TableTestProgram SIMPLE_CALC = +TableTestProgram.of("simple-calc", "Simple calc with sources and sinks") Review Comment: follow the guidelines in `TableTestProgram.of` maybe we need automated checks for this :D ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java: ## @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license
Re: [PR] [FLINK-33375] Implement restore test base [flink]
twalthr commented on code in PR #23603: URL: https://github.com/apache/flink/pull/23603#discussion_r1374362240 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java: ## @@ -378,6 +395,8 @@ public void invoke(RowData value, Context context) throws Exception { } synchronized (LOCK) { localRawResult.add((Row) converter.toExternal(value)); +notifyRawResultsObservers( Review Comment: method is called 4 times with the same args? maybe create a method in super class? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33375] Implement restore test base [flink]
twalthr commented on code in PR #23603: URL: https://github.com/apache/flink/pull/23603#discussion_r1374350175 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java: ## @@ -981,16 +998,25 @@ public boolean isBounded() { case "NewSource": checkArgument( !failingSource, "Values Source doesn't support as failing new source."); +final FinishingLogic finishingLogic = +finite ? FinishingLogic.FINITE : FinishingLogic.INFINITE; +final Boundedness boundedness = Review Comment: do we have a check somewhere that infinite + bounded makes no sense? ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java: ## @@ -879,6 +890,7 @@ private TestValuesScanTableSourceWithoutProjectionPushDown( DataType producedDataType, ChangelogMode changelogMode, boolean bounded, +boolean finite, Review Comment: Use the enum here as well? Also for bounded? Or is it due to the serialization? ## flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/FinishingLogic.java: ## @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.source; + +import org.apache.flink.table.planner.factories.TestValuesTableFactory; + +/** + * Tells sources created from {@link TestValuesTableFactory} how they should behave after producing + * all data. Review Comment: This might be confusing for people. Explain that "even if the source is declared unbounded, we can internally shut it down"? ## flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/split/ValuesSourcePartitionSplitSerializer.java: ## @@ -68,7 +70,11 @@ public ValuesSourcePartitionSplit deserialize(int version, byte[] serialized) String value = in.readUTF(); partition.put(key, value); } -ValuesSourcePartitionSplit split = new ValuesSourcePartitionSplit(partition); +final boolean isInfinite = in.readBoolean(); +ValuesSourcePartitionSplit split = +new ValuesSourcePartitionSplit( +partition, +isInfinite ? FinishingLogic.INFINITE : FinishingLogic.FINITE); Review Comment: We have a lot of these if checks. Maybe introduce a method in `FinishingLogic` serialization and deserialization? ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java: ## @@ -313,6 +315,9 @@ private static RowKind parseRowKind(String rowKindShortString) { private static final ConfigOption BOUNDED = ConfigOptions.key("bounded").booleanType().defaultValue(false); +private static final ConfigOption FINITE = +ConfigOptions.key("finite").booleanType().defaultValue(true); Review Comment: I wonder if `terminating` would have been a better word. Because finite sounds like a synonym to bounded. Where terminating refers to the runtime behavior. But just a suggestion. ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java: ## @@ -313,6 +315,9 @@ private static RowKind parseRowKind(String rowKindShortString) { private static final ConfigOption BOUNDED = ConfigOptions.key("bounded").booleanType().defaultValue(false); +private static final ConfigOption FINITE = +ConfigOptions.key("finite").booleanType().defaultValue(true); Review Comment: maybe add a description to the config option. esp explain finite vs bounded -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service,
Re: [PR] [FLINK-33375] Implement restore test base [flink]
flinkbot commented on PR #23603: URL: https://github.com/apache/flink/pull/23603#issuecomment-1781372184 ## CI report: * d8d49116263be7f4819fec276692ee59ef5f9003 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-33375] Implement restore test base [flink]
dawidwys opened a new pull request, #23603: URL: https://github.com/apache/flink/pull/23603 ## What is the purpose of the change This introduces a test base for writing restore tests. ## Verifying this change It contains a single test as an example for a simple calc. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (**yes** / no) - If yes, how is the feature documented? (not applicable / docs / **JavaDocs** / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org