Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]
affo commented on PR #24471: URL: https://github.com/apache/flink/pull/24471#issuecomment-2172535386 > Yeah, the test failure is unrelated ([FLINK-35042](https://issues.apache.org/jira/browse/FLINK-35042)). Unfortunately, we missed to get it in before the feature freeze. Let's merge it after the release branch for 1.20 is created. Ah snap, gonna stay tuned on this then I will create the JIRA for the "on-the-fly" processing after merging as stated above -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]
XComp commented on PR #24471: URL: https://github.com/apache/flink/pull/24471#issuecomment-2171654110 Yeah, the test failure is unrelated (FLINK-35042). Unfortunately, we missed to get it in before the feature freeze. Let's merge it after the release branch for 1.20 is created. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]
affo commented on PR #24471: URL: https://github.com/apache/flink/pull/24471#issuecomment-2165511898 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]
JingGe commented on code in PR #24471: URL: https://github.com/apache/flink/pull/24471#discussion_r1637750136 ## flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java: ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.tests; + +import org.apache.flink.connector.testframe.source.FromElementsSource; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.types.Row; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.List; + +class Generator implements FromElementsSource.ElementsSupplier { +private static final long serialVersionUID = -8455653458083514261L; +private final List elements; + +static Generator create( +int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int offsetSeconds) { +final int stepMs = (int) (1000 / rowsPerKeyAndSecond); +final long durationMs = durationSeconds * 1000L; +final long offsetMs = offsetSeconds * 2000L; +final List elements = new ArrayList<>(); +int keyIndex = 0; +long ms = 0; +while (ms < durationMs) { +elements.add(createRow(keyIndex++, ms, offsetMs)); Review Comment: I still think we should keep the on-the-fly implementation. But we can move forward for now and modify it once we have the performance issue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]
XComp commented on code in PR #24471: URL: https://github.com/apache/flink/pull/24471#discussion_r1632637479 ## flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java: ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.tests; + +import org.apache.flink.connector.testframe.source.FromElementsSource; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.types.Row; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.List; + +class Generator implements FromElementsSource.ElementsSupplier { +private static final long serialVersionUID = -8455653458083514261L; +private final List elements; + +static Generator create( +int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int offsetSeconds) { +final int stepMs = (int) (1000 / rowsPerKeyAndSecond); +final long durationMs = durationSeconds * 1000L; +final long offsetMs = offsetSeconds * 2000L; +final List elements = new ArrayList<>(); +int keyIndex = 0; +long ms = 0; +while (ms < durationMs) { +elements.add(createRow(keyIndex++, ms, offsetMs)); Review Comment: @JingGe any objections? The refactoring should be ok considering that the amount of data involved is quite low. The actual migration from bash to Java is also done in a separate commit which enables us to revert if we feel it's necessary. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]
affo commented on code in PR #24471: URL: https://github.com/apache/flink/pull/24471#discussion_r1627696194 ## flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java: ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.tests; + +import org.apache.flink.connector.testframe.source.FromElementsSource; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.types.Row; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.List; + +class Generator implements FromElementsSource.ElementsSupplier { +private static final long serialVersionUID = -8455653458083514261L; +private final List elements; + +static Generator create( +int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int offsetSeconds) { +final int stepMs = (int) (1000 / rowsPerKeyAndSecond); +final long durationMs = durationSeconds * 1000L; +final long offsetMs = offsetSeconds * 2000L; +final List elements = new ArrayList<>(); +int keyIndex = 0; +long ms = 0; +while (ms < durationMs) { +elements.add(createRow(keyIndex++, ms, offsetMs)); Review Comment: @JingGe @XComp Thank you for the feedback! @XComp I would merge this is as it is. In the brackground I was already working on something similar, I will create another issue for adding a test source for batch tests and for Table API -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]
XComp commented on code in PR #24471: URL: https://github.com/apache/flink/pull/24471#discussion_r1627611020 ## flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java: ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.tests; + +import org.apache.flink.connector.testframe.source.FromElementsSource; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.types.Row; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.List; + +class Generator implements FromElementsSource.ElementsSupplier { +private static final long serialVersionUID = -8455653458083514261L; +private final List elements; + +static Generator create( +int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int offsetSeconds) { +final int stepMs = (int) (1000 / rowsPerKeyAndSecond); +final long durationMs = durationSeconds * 1000L; +final long offsetMs = offsetSeconds * 2000L; +final List elements = new ArrayList<>(); +int keyIndex = 0; +long ms = 0; +while (ms < durationMs) { +elements.add(createRow(keyIndex++, ms, offsetMs)); Review Comment: > Got it, the number of records is not huge, that's why I did not mention that True - that's a valid point. I didn't check the number of elements as part of my last comment. I leave the decision up to you whether it's done in a new PR or part of this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]
JingGe commented on code in PR #24471: URL: https://github.com/apache/flink/pull/24471#discussion_r1627390009 ## flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java: ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.tests; + +import org.apache.flink.connector.testframe.source.FromElementsSource; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.types.Row; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.List; + +class Generator implements FromElementsSource.ElementsSupplier { +private static final long serialVersionUID = -8455653458083514261L; +private final List elements; + +static Generator create( +int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int offsetSeconds) { +final int stepMs = (int) (1000 / rowsPerKeyAndSecond); +final long durationMs = durationSeconds * 1000L; +final long offsetMs = offsetSeconds * 2000L; +final List elements = new ArrayList<>(); +int keyIndex = 0; +long ms = 0; +while (ms < durationMs) { +elements.add(createRow(keyIndex++, ms, offsetMs)); Review Comment: Sounds great! Please feel free to create a follow up ticket and contribute the new generator with a new PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]
affo commented on code in PR #24471: URL: https://github.com/apache/flink/pull/24471#discussion_r1627196144 ## flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java: ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.tests; + +import org.apache.flink.connector.testframe.source.FromElementsSource; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.types.Row; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.List; + +class Generator implements FromElementsSource.ElementsSupplier { +private static final long serialVersionUID = -8455653458083514261L; +private final List elements; + +static Generator create( +int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int offsetSeconds) { +final int stepMs = (int) (1000 / rowsPerKeyAndSecond); +final long durationMs = durationSeconds * 1000L; +final long offsetMs = offsetSeconds * 2000L; +final List elements = new ArrayList<>(); +int keyIndex = 0; +long ms = 0; +while (ms < durationMs) { +elements.add(createRow(keyIndex++, ms, offsetMs)); Review Comment: @JingGe @XComp Got it, the number of records is not huge, that's why I did not mention that However, I understand your concerns as well At this point I would write another generator as part of this PR. However, I would provide that as part of test-utils rather than only confined to batch as other tests could benefit from that. What do you guys think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]
XComp commented on code in PR #24471: URL: https://github.com/apache/flink/pull/24471#discussion_r1626077906 ## flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java: ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.tests; + +import org.apache.flink.connector.testframe.source.FromElementsSource; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.types.Row; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.List; + +class Generator implements FromElementsSource.ElementsSupplier { +private static final long serialVersionUID = -8455653458083514261L; +private final List elements; + +static Generator create( +int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int offsetSeconds) { +final int stepMs = (int) (1000 / rowsPerKeyAndSecond); +final long durationMs = durationSeconds * 1000L; +final long offsetMs = offsetSeconds * 2000L; +final List elements = new ArrayList<>(); +int keyIndex = 0; +long ms = 0; +while (ms < durationMs) { +elements.add(createRow(keyIndex++, ms, offsetMs)); Review Comment: Jing actually has a good point on the memory consumption. I missed that one. We should continue generating the records on-the-fly to be closer to what the original test did. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]
JingGe commented on code in PR #24471: URL: https://github.com/apache/flink/pull/24471#discussion_r1624540081 ## flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java: ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.tests; + +import org.apache.flink.connector.testframe.source.FromElementsSource; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.types.Row; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.List; + +class Generator implements FromElementsSource.ElementsSupplier { +private static final long serialVersionUID = -8455653458083514261L; +private final List elements; + +static Generator create( +int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int offsetSeconds) { +final int stepMs = (int) (1000 / rowsPerKeyAndSecond); +final long durationMs = durationSeconds * 1000L; +final long offsetMs = offsetSeconds * 2000L; +final List elements = new ArrayList<>(); +int keyIndex = 0; +long ms = 0; +while (ms < durationMs) { +elements.add(createRow(keyIndex++, ms, offsetMs)); Review Comment: The original implementation was only for batch in `BatchSQLTestProgram`. This PR is for the migration that should not implicitly bring big change for the data generation that might cause performance issue later. In addition, the new implementation is still in the `flink-batch-sql-test` module which should be used only for batch. Not sure if there are already similar generators in the stream-sql-test. If not, a new jira task could be created and add the new implementation to the stream sql test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]
JingGe commented on code in PR #24471: URL: https://github.com/apache/flink/pull/24471#discussion_r1624540081 ## flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java: ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.tests; + +import org.apache.flink.connector.testframe.source.FromElementsSource; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.types.Row; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.List; + +class Generator implements FromElementsSource.ElementsSupplier { +private static final long serialVersionUID = -8455653458083514261L; +private final List elements; + +static Generator create( +int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int offsetSeconds) { +final int stepMs = (int) (1000 / rowsPerKeyAndSecond); +final long durationMs = durationSeconds * 1000L; +final long offsetMs = offsetSeconds * 2000L; +final List elements = new ArrayList<>(); +int keyIndex = 0; +long ms = 0; +while (ms < durationMs) { +elements.add(createRow(keyIndex++, ms, offsetMs)); Review Comment: The original implementation was only for batch in `BatchSQLTestProgram`. This PR is for the migration that should not implicitly bring big change for the data generation that might cause performance issue later. In addition, the new implementation is still in the `flink-batch-sql-test` module which should be used only for batch. Not sure if there are already similar generators in the stream-sql-test. If not, a new jira task could be created and add the new generator implementation to the stream sql test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]
JingGe commented on code in PR #24471: URL: https://github.com/apache/flink/pull/24471#discussion_r1624540081 ## flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java: ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.tests; + +import org.apache.flink.connector.testframe.source.FromElementsSource; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.types.Row; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.List; + +class Generator implements FromElementsSource.ElementsSupplier { +private static final long serialVersionUID = -8455653458083514261L; +private final List elements; + +static Generator create( +int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int offsetSeconds) { +final int stepMs = (int) (1000 / rowsPerKeyAndSecond); +final long durationMs = durationSeconds * 1000L; +final long offsetMs = offsetSeconds * 2000L; +final List elements = new ArrayList<>(); +int keyIndex = 0; +long ms = 0; +while (ms < durationMs) { +elements.add(createRow(keyIndex++, ms, offsetMs)); Review Comment: The original implementation was only for batch in `BatchSQLTestProgram`. This PR is for the migration that should not implicitly bring big change for the data generation that might cause performance issue later. In addition, the new implementation is still in the `flink-batch-sql-test` module which should be used only for batch. Not sure if there are already similar generators in the stream-sql-test. If not, a new jira task could created and add the new implementation to the stream sql test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]
JingGe commented on code in PR #24471: URL: https://github.com/apache/flink/pull/24471#discussion_r1624540081 ## flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java: ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.tests; + +import org.apache.flink.connector.testframe.source.FromElementsSource; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.types.Row; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.List; + +class Generator implements FromElementsSource.ElementsSupplier { +private static final long serialVersionUID = -8455653458083514261L; +private final List elements; + +static Generator create( +int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int offsetSeconds) { +final int stepMs = (int) (1000 / rowsPerKeyAndSecond); +final long durationMs = durationSeconds * 1000L; +final long offsetMs = offsetSeconds * 2000L; +final List elements = new ArrayList<>(); +int keyIndex = 0; +long ms = 0; +while (ms < durationMs) { +elements.add(createRow(keyIndex++, ms, offsetMs)); Review Comment: The original implementation was only for batch in `BatchSQLTestProgram`. This PR is for the migration that should not implicitly bring big change for the data generation that might cause performance issue later. In addition, the new implementation is still in the `flink-batch-sql-test` module which should be used only for batch. No sure if there are already similar generators in the stream-sql-test. If not, a new jira task could created and add the new implementation to the stream sql test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]
affo commented on code in PR #24471: URL: https://github.com/apache/flink/pull/24471#discussion_r1624446134 ## flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java: ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.tests; + +import org.apache.flink.connector.testframe.source.FromElementsSource; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.types.Row; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.List; + +class Generator implements FromElementsSource.ElementsSupplier { +private static final long serialVersionUID = -8455653458083514261L; +private final List elements; + +static Generator create( +int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int offsetSeconds) { +final int stepMs = (int) (1000 / rowsPerKeyAndSecond); +final long durationMs = durationSeconds * 1000L; +final long offsetMs = offsetSeconds * 2000L; +final List elements = new ArrayList<>(); +int keyIndex = 0; +long ms = 0; +while (ms < durationMs) { +elements.add(createRow(keyIndex++, ms, offsetMs)); Review Comment: In other words, I could have another implementation of the bounded source without any fault-tolerance guarantee -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]
affo commented on code in PR #24471: URL: https://github.com/apache/flink/pull/24471#discussion_r1624443303 ## flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java: ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.tests; + +import org.apache.flink.connector.testframe.source.FromElementsSource; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.types.Row; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.List; + +class Generator implements FromElementsSource.ElementsSupplier { +private static final long serialVersionUID = -8455653458083514261L; +private final List elements; + +static Generator create( +int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int offsetSeconds) { +final int stepMs = (int) (1000 / rowsPerKeyAndSecond); +final long durationMs = durationSeconds * 1000L; +final long offsetMs = offsetSeconds * 2000L; +final List elements = new ArrayList<>(); +int keyIndex = 0; +long ms = 0; +while (ms < durationMs) { +elements.add(createRow(keyIndex++, ms, offsetMs)); Review Comment: @JingGe yeah, nothing related strictly to this case. The `FromElementsSource` is actually generic and can be used and is used in the streaming case. Here I am using it in batch table mode. I am just reusing that as it is possible -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]
JingGe commented on code in PR #24471: URL: https://github.com/apache/flink/pull/24471#discussion_r1624191535 ## flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java: ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.tests; + +import org.apache.flink.connector.testframe.source.FromElementsSource; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.types.Row; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.List; + +class Generator implements FromElementsSource.ElementsSupplier { +private static final long serialVersionUID = -8455653458083514261L; +private final List elements; + +static Generator create( +int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int offsetSeconds) { +final int stepMs = (int) (1000 / rowsPerKeyAndSecond); +final long durationMs = durationSeconds * 1000L; +final long offsetMs = offsetSeconds * 2000L; +final List elements = new ArrayList<>(); +int keyIndex = 0; +long ms = 0; +while (ms < durationMs) { +elements.add(createRow(keyIndex++, ms, offsetMs)); Review Comment: Thanks for the reply. Commonly, batch processing does not rely on offset. Would you please help me understand why the source should be fault-tolerant and requires getting record by offset for batch? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]
JingGe commented on code in PR #24471: URL: https://github.com/apache/flink/pull/24471#discussion_r1624191535 ## flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java: ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.tests; + +import org.apache.flink.connector.testframe.source.FromElementsSource; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.types.Row; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.List; + +class Generator implements FromElementsSource.ElementsSupplier { +private static final long serialVersionUID = -8455653458083514261L; +private final List elements; + +static Generator create( +int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int offsetSeconds) { +final int stepMs = (int) (1000 / rowsPerKeyAndSecond); +final long durationMs = durationSeconds * 1000L; +final long offsetMs = offsetSeconds * 2000L; +final List elements = new ArrayList<>(); +int keyIndex = 0; +long ms = 0; +while (ms < durationMs) { +elements.add(createRow(keyIndex++, ms, offsetMs)); Review Comment: Thanks for the reply. Batch processing does not rely on offset commonly. Would you please help me understand why the source should be fault-tolerant and requires getting record by offset for batch? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]
JingGe commented on code in PR #24471: URL: https://github.com/apache/flink/pull/24471#discussion_r1622617735 ## flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java: ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.tests; + +import org.apache.flink.connector.testframe.source.FromElementsSource; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.types.Row; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.List; + +class Generator implements FromElementsSource.ElementsSupplier { +private static final long serialVersionUID = -8455653458083514261L; +private final List elements; + +static Generator create( +int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int offsetSeconds) { +final int stepMs = (int) (1000 / rowsPerKeyAndSecond); +final long durationMs = durationSeconds * 1000L; +final long offsetMs = offsetSeconds * 2000L; +final List elements = new ArrayList<>(); +int keyIndex = 0; +long ms = 0; +while (ms < durationMs) { +elements.add(createRow(keyIndex++, ms, offsetMs)); Review Comment: The new implementation will consume more memory than the old one which will generate `row` iteratively on the fly. This could be a potential issue for large data volume batch tests. ## flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/BatchSQLTest.java: ## @@ -34,66 +35,105 @@ import org.apache.flink.table.sinks.CsvTableSink; import org.apache.flink.table.sources.InputFormatTableSource; import org.apache.flink.table.types.DataType; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.test.resources.ResourceTestUtils; import org.apache.flink.types.Row; +import org.apache.flink.util.TestLoggerExtension; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.Serializable; +import java.nio.file.Files; +import java.nio.file.Path; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneOffset; import java.util.Iterator; import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.UUID; -/** - * End-to-end test for batch SQL queries. - * - * The sources are generated and bounded. The result is always constant. - * - * Parameters: -outputPath output file path for CsvTableSink; -sqlStatement SQL statement that - * will be executed as executeSql - */ -public class BatchSQLTestProgram { +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** End-to-End tests for Batch SQL tests. */ +@ExtendWith(TestLoggerExtension.class) +public class BatchSQLTest { +private static final Logger LOG = LoggerFactory.getLogger(BatchSQLTest.class); + +private static final Path sqlPath = +ResourceTestUtils.getResource("resources/sql-job-query.sql"); + +@TempDir private Path tmp; + +@RegisterExtension +private static final MiniClusterExtension MINI_CLUSTER = +new MiniClusterExtension( +new MiniClusterResourceConfiguration.Builder() +.setNumberTaskManagers(2) +.setNumberSlotsPerTaskManager(1) +.build()); + +private Path result; + +@BeforeEach +public void before() { +this.result = tmp.resolve(String.format("result-%s", UUID.randomUUID())); +LOG.info("Results for this test will be stored at: {}", this.result); +} + +@ParameterizedTest +@EnumSource( +
Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]
affo commented on PR #24471: URL: https://github.com/apache/flink/pull/24471#issuecomment-2139041576 @XComp done! Don't worry in any case, I loved the review process. This is my first contribution and this is part of learning for next ones 欄 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]
XComp commented on PR #24471: URL: https://github.com/apache/flink/pull/24471#issuecomment-2138886095 One final thing: I wasn't able to do it myself somehow. Can you change the commit message prefix from `[refactor]` to `[FLINK-20398]`? "refactor" isn't a prefix the Flink community usually use. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]
XComp commented on PR #24471: URL: https://github.com/apache/flink/pull/24471#issuecomment-2138868656 I'm not gonna wait for another CI round. Looks like the CI bot didn't pick up the rerun command. Anyway, I verified that the test ran (see [logs](https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59935=logs=af184cdd-c6d8-5084-0b69-7e9c67b35f7a=0f3adb59-eefa-51c6-2858-3654d9e0749d=14026)). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]
XComp commented on PR #24471: URL: https://github.com/apache/flink/pull/24471#issuecomment-2137605027 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]
XComp commented on PR #24471: URL: https://github.com/apache/flink/pull/24471#issuecomment-2137601891 CI test failure is unrelated: FLINK-34513 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]
XComp commented on code in PR #24471: URL: https://github.com/apache/flink/pull/24471#discussion_r1618480332 ## flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java: ## @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.tests; + +import org.apache.flink.connector.testframe.source.FromElementsSource; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.types.Row; + +import org.jetbrains.annotations.NotNull; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +class Generator implements FromElementsSource.ElementsSupplier { +private static final long serialVersionUID = -8455653458083514261L; +private final List elements; + +static Generator create( +int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int offsetSeconds) { +int sleepMs = (int) (1000 / rowsPerKeyAndSecond); +InternalGenerator gen = +new InternalGenerator( +numKeys, durationSeconds * 1000L, sleepMs, offsetSeconds * 2000L); +List elements = new ArrayList<>(); +gen.forEachRemaining(elements::add); +return new Generator(elements); +} Review Comment: You're right - I got a bit carried away. We shouldn't optimize it any further. But let's keep the changes as they are now since you applied it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]
affo commented on code in PR #24471: URL: https://github.com/apache/flink/pull/24471#discussion_r1618451567 ## flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java: ## @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.tests; + +import org.apache.flink.connector.testframe.source.FromElementsSource; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.types.Row; + +import org.jetbrains.annotations.NotNull; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +class Generator implements FromElementsSource.ElementsSupplier { +private static final long serialVersionUID = -8455653458083514261L; +private final List elements; + +static Generator create( +int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int offsetSeconds) { +int sleepMs = (int) (1000 / rowsPerKeyAndSecond); +InternalGenerator gen = +new InternalGenerator( +numKeys, durationSeconds * 1000L, sleepMs, offsetSeconds * 2000L); +List elements = new ArrayList<>(); +gen.forEachRemaining(elements::add); +return new Generator(elements); +} Review Comment: I applied your suggestion as I like it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]
affo commented on code in PR #24471: URL: https://github.com/apache/flink/pull/24471#discussion_r1618443253 ## flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java: ## @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.tests; + +import org.apache.flink.connector.testframe.source.FromElementsSource; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.types.Row; + +import org.jetbrains.annotations.NotNull; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +class Generator implements FromElementsSource.ElementsSupplier { +private static final long serialVersionUID = -8455653458083514261L; +private final List elements; + +static Generator create( +int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int offsetSeconds) { +int sleepMs = (int) (1000 / rowsPerKeyAndSecond); +InternalGenerator gen = +new InternalGenerator( +numKeys, durationSeconds * 1000L, sleepMs, offsetSeconds * 2000L); +List elements = new ArrayList<>(); +gen.forEachRemaining(elements::add); +return new Generator(elements); +} Review Comment: Did not want to edit too much the original logic of the generator itself But it makes sense -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]
XComp commented on code in PR #24471: URL: https://github.com/apache/flink/pull/24471#discussion_r1616241149 ## flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java: ## @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.tests; + +import org.apache.flink.connector.testframe.source.FromElementsSource; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.types.Row; + +import org.jetbrains.annotations.NotNull; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +class Generator implements FromElementsSource.ElementsSupplier { +private static final long serialVersionUID = -8455653458083514261L; +private final List elements; + +static Generator create( +int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int offsetSeconds) { +int sleepMs = (int) (1000 / rowsPerKeyAndSecond); +InternalGenerator gen = +new InternalGenerator( +numKeys, durationSeconds * 1000L, sleepMs, offsetSeconds * 2000L); +List elements = new ArrayList<>(); +gen.forEachRemaining(elements::add); +return new Generator(elements); +} Review Comment: ```suggestion static Generator create( int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int offsetSeconds) { final int stepMs = (int) (1000 / rowsPerKeyAndSecond); final long durationMs = durationSeconds * 1000L; final long offsetMs = offsetSeconds * 2000L; final List elements = new ArrayList<>(); int keyIndex = 0; long ms = 0; while (ms < durationMs) { elements.add(createRow(keyIndex++, ms, offsetMs)); if (keyIndex >= numKeys) { keyIndex = 0; ms += stepMs; } } return new Generator(elements); } private static Row createRow(int keyIndex, long milliseconds, long offsetMillis) { return Row.of( keyIndex, LocalDateTime.ofInstant( Instant.ofEpochMilli(milliseconds + offsetMillis), ZoneOffset.UTC), "Some payload..."); } ``` nit: what we could also do is to get rid of the `InternalGenerator` class. It's just a while loop in the end. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]
affo commented on PR #24471: URL: https://github.com/apache/flink/pull/24471#issuecomment-2127135299 @XComp Hello! Final touches done and your comments are addressed I added the capability for `FromElementsSource` to accept a `ElementsSupplier` at init phase. The problem for which I had to implement a `Serializable extend RowData` was due to the fact that the `FromElementsSource` had a field `List elements`, where `OUT` can also be non-serializable (which is the case of `RowData`), so, when the job was starting the operator couldn't be serialized. I made it accept a `ElementsSupplier extends Serializable` so that it is clear that the supplier should be serializable. In my use case, I simply made the quirk that I preserved the previous implementation using `Row` (which is serializable) and just convert it to `RowData` on `get`. No class has now `RowData` fields that prevent their serializability. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]
affo commented on code in PR #24471: URL: https://github.com/apache/flink/pull/24471#discussion_r1609692387 ## flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java: ## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.tests; + +import org.apache.flink.table.data.RowData; + +import org.jetbrains.annotations.NotNull; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.Iterator; +import java.util.NoSuchElementException; + +class Generator implements Iterator, Iterable { +final int numKeys; + +private int keyIndex = 0; + +private final long durationMs; +private final long stepMs; +private final long offsetMs; +private long ms = 0; + +static Generator create( +int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int offsetSeconds) { +int sleepMs = (int) (1000 / rowsPerKeyAndSecond); +return new Generator(numKeys, durationSeconds * 1000L, sleepMs, offsetSeconds * 2000L); +} + +Generator(int numKeys, long durationMs, long stepMs, long offsetMs) { +this.numKeys = numKeys; +this.durationMs = durationMs; +this.stepMs = stepMs; +this.offsetMs = offsetMs; +} + +@Override +public boolean hasNext() { +return ms < durationMs; +} + +@Override +public RowData next() { +if (!hasNext()) { +throw new NoSuchElementException(); +} +RowData row = +new GeneratedRow( +keyIndex, +LocalDateTime.ofInstant( +Instant.ofEpochMilli(ms + offsetMs), ZoneOffset.UTC), +"Some payload..."); +++keyIndex; +if (keyIndex >= numKeys) { +keyIndex = 0; +ms += stepMs; +} +return row; +} + +@NotNull Review Comment: I think this falls in this case: > but can be used in certain cases to override a previous annotation, As `iterator` overrides: ```java public interface Iterable { /** * Returns an iterator over elements of type {@code T}. * * @return an Iterator. */ @NotNull Iterator iterator(); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]
affo commented on code in PR #24471: URL: https://github.com/apache/flink/pull/24471#discussion_r1609692387 ## flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java: ## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.tests; + +import org.apache.flink.table.data.RowData; + +import org.jetbrains.annotations.NotNull; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.Iterator; +import java.util.NoSuchElementException; + +class Generator implements Iterator, Iterable { +final int numKeys; + +private int keyIndex = 0; + +private final long durationMs; +private final long stepMs; +private final long offsetMs; +private long ms = 0; + +static Generator create( +int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int offsetSeconds) { +int sleepMs = (int) (1000 / rowsPerKeyAndSecond); +return new Generator(numKeys, durationSeconds * 1000L, sleepMs, offsetSeconds * 2000L); +} + +Generator(int numKeys, long durationMs, long stepMs, long offsetMs) { +this.numKeys = numKeys; +this.durationMs = durationMs; +this.stepMs = stepMs; +this.offsetMs = offsetMs; +} + +@Override +public boolean hasNext() { +return ms < durationMs; +} + +@Override +public RowData next() { +if (!hasNext()) { +throw new NoSuchElementException(); +} +RowData row = +new GeneratedRow( +keyIndex, +LocalDateTime.ofInstant( +Instant.ofEpochMilli(ms + offsetMs), ZoneOffset.UTC), +"Some payload..."); +++keyIndex; +if (keyIndex >= numKeys) { +keyIndex = 0; +ms += stepMs; +} +return row; +} + +@NotNull Review Comment: I think this falls in this case: > but can be used in certain cases to override a previous annotation, As `iterator` overrides: ```java public interface Iterable { /** * Returns an iterator over elements of type {@code T}. * * @return an Iterator. */ Iterator iterator(); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]
affo commented on code in PR #24471: URL: https://github.com/apache/flink/pull/24471#discussion_r1609688204 ## flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/GeneratedRow.java: ## @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.tests; + +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RawValueData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.types.RowKind; + +import java.io.Serializable; +import java.time.LocalDateTime; +import java.util.Objects; + +class GeneratedRow implements RowData, Serializable { Review Comment: The problem is that, in order to avoid using deprecated function and the alike, you need to use `static SourceProvider of(Source source, @Nullable Integer sourceParallelism)` which forces you to use `RowData`. Anyhow, I am going to dive deeper into this and see if this is the only solution -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]
XComp commented on code in PR #24471: URL: https://github.com/apache/flink/pull/24471#discussion_r1600075906 ## flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/GeneratedRow.java: ## @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.tests; + +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RawValueData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.types.RowKind; + +import java.io.Serializable; +import java.time.LocalDateTime; +import java.util.Objects; + +class GeneratedRow implements RowData, Serializable { Review Comment: Was there a reason to implement a dedicated `RowData`subclass rather than using the `Row`class like it's done in the original test code? 樂 ## flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java: ## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.tests; + +import org.apache.flink.table.data.RowData; + +import org.jetbrains.annotations.NotNull; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.Iterator; +import java.util.NoSuchElementException; + +class Generator implements Iterator, Iterable { +final int numKeys; Review Comment: ```suggestion private final int numKeys; ``` There is no real reason to make this field a package-private one, does it? 樂 ## flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java: ## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.tests; + +import org.apache.flink.table.data.RowData; + +import org.jetbrains.annotations.NotNull; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.Iterator; +import java.util.NoSuchElementException; + +class Generator implements Iterator, Iterable { +final int numKeys; + +private int keyIndex = 0; + +private final long durationMs; +private final long stepMs; +private final long offsetMs; +private long ms = 0; + +static Generator create( +int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int offsetSeconds) { +int sleepMs = (int) (1000 / rowsPerKeyAndSecond); +return
Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]
morazow commented on code in PR #24471: URL: https://github.com/apache/flink/pull/24471#discussion_r1563396268 ## flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/BatchSQLTest.java: ## @@ -114,104 +129,24 @@ public void testBatchSQL(BatchShuffleMode shuffleMode, @TempDir Path tmpDir) thr jobClient.getJobExecutionResult().get(); final String expected = -"1980,1970-01-01 00:00:00.0\n" -+ "1980,1970-01-01 00:00:20.0\n" -+ "1980,1970-01-01 00:00:40.0\n"; +"1980,1970-01-01 00:00:00\n" Review Comment: Got it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]
affo commented on code in PR #24471: URL: https://github.com/apache/flink/pull/24471#discussion_r1562170708 ## flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java: ## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.tests; + +import org.apache.flink.table.data.RowData; + +import org.jetbrains.annotations.NotNull; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.Iterator; +import java.util.NoSuchElementException; + +class Generator implements Iterator, Iterable { +final int numKeys; + +private int keyIndex = 0; + +private final long durationMs; +private final long stepMs; +private final long offsetMs; +private long ms = 0; + +static Generator create( +int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int offsetSeconds) { +int sleepMs = (int) (1000 / rowsPerKeyAndSecond); +return new Generator(numKeys, durationSeconds * 1000L, sleepMs, offsetSeconds * 2000L); +} + +Generator(int numKeys, long durationMs, long stepMs, long offsetMs) { +this.numKeys = numKeys; +this.durationMs = durationMs; +this.stepMs = stepMs; +this.offsetMs = offsetMs; +} + +@Override +public boolean hasNext() { +return ms < durationMs; +} + +@Override +public RowData next() { +if (!hasNext()) { +throw new NoSuchElementException(); +} +RowData row = +new GeneratedRow( +keyIndex, +LocalDateTime.ofInstant( +Instant.ofEpochMilli(ms + offsetMs), ZoneOffset.UTC), +"Some payload..."); Review Comment: The goal of this PR is to port the test. I preserved its logic 100%, so, I left the record generation as it was in the original code -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]
affo commented on code in PR #24471: URL: https://github.com/apache/flink/pull/24471#discussion_r1562169902 ## flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/BatchSQLTest.java: ## @@ -114,104 +129,24 @@ public void testBatchSQL(BatchShuffleMode shuffleMode, @TempDir Path tmpDir) thr jobClient.getJobExecutionResult().get(); final String expected = -"1980,1970-01-01 00:00:00.0\n" -+ "1980,1970-01-01 00:00:20.0\n" -+ "1980,1970-01-01 00:00:40.0\n"; +"1980,1970-01-01 00:00:00\n" Review Comment: Hello! Yeah, the previous `CsvTableSink` is deprecated, I substitute it with the `filesystem` connector. Simply I had to edit the expected result for some formatting internal of timestamp. I just removed `.0` at the end, the result is, after all, the exact same. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]
morazow commented on code in PR #24471: URL: https://github.com/apache/flink/pull/24471#discussion_r1561946899 ## flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java: ## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.tests; + +import org.apache.flink.table.data.RowData; + +import org.jetbrains.annotations.NotNull; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.Iterator; +import java.util.NoSuchElementException; + +class Generator implements Iterator, Iterable { +final int numKeys; + +private int keyIndex = 0; + +private final long durationMs; +private final long stepMs; +private final long offsetMs; +private long ms = 0; + +static Generator create( +int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int offsetSeconds) { +int sleepMs = (int) (1000 / rowsPerKeyAndSecond); +return new Generator(numKeys, durationSeconds * 1000L, sleepMs, offsetSeconds * 2000L); +} + +Generator(int numKeys, long durationMs, long stepMs, long offsetMs) { +this.numKeys = numKeys; +this.durationMs = durationMs; +this.stepMs = stepMs; +this.offsetMs = offsetMs; +} + +@Override +public boolean hasNext() { +return ms < durationMs; +} + +@Override +public RowData next() { +if (!hasNext()) { +throw new NoSuchElementException(); +} +RowData row = +new GeneratedRow( +keyIndex, +LocalDateTime.ofInstant( +Instant.ofEpochMilli(ms + offsetMs), ZoneOffset.UTC), +"Some payload..."); Review Comment: I saw this was in the original change, but should we randomize the payload? ## flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/BatchSQLTest.java: ## @@ -114,104 +129,24 @@ public void testBatchSQL(BatchShuffleMode shuffleMode, @TempDir Path tmpDir) thr jobClient.getJobExecutionResult().get(); final String expected = -"1980,1970-01-01 00:00:00.0\n" -+ "1980,1970-01-01 00:00:20.0\n" -+ "1980,1970-01-01 00:00:40.0\n"; +"1980,1970-01-01 00:00:00\n" Review Comment: Why is this change is required? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]
affo commented on PR #24471: URL: https://github.com/apache/flink/pull/24471#issuecomment-2049852124 @XComp Glad for your vacation! Finally I also addressed the deprecation warnings and went through the implementation of a custom connector through `DynamicTableSource`. It turned out to be quite tough, as probably it is not that common, or these new APIs are not super-well documented for now. I wanted to use `TableEnvironment.fromValues` however, I could not use it as the test was hanging... I want to understand why and, in case, file an issue for that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]
XComp commented on PR #24471: URL: https://github.com/apache/flink/pull/24471#issuecomment-2033738415 fyi: I will be off for the rest of April and, therefore, wouldn't be able to finalize this PR. You might want to reach out to other committers or expect a delay in my responses. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]
affo commented on code in PR #24471: URL: https://github.com/apache/flink/pull/24471#discussion_r1543172250 ## flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/BatchSQLTest.java: ## @@ -34,66 +35,105 @@ import org.apache.flink.table.sinks.CsvTableSink; import org.apache.flink.table.sources.InputFormatTableSource; import org.apache.flink.table.types.DataType; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.test.resources.ResourceTestUtils; import org.apache.flink.types.Row; +import org.apache.flink.util.TestLoggerExtension; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.Serializable; +import java.nio.file.Files; +import java.nio.file.Path; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneOffset; import java.util.Iterator; import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.UUID; -/** - * End-to-end test for batch SQL queries. - * - * The sources are generated and bounded. The result is always constant. - * - * Parameters: -outputPath output file path for CsvTableSink; -sqlStatement SQL statement that - * will be executed as executeSql - */ -public class BatchSQLTestProgram { +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** End-to-End tests for Batch SQL tests. */ +@ExtendWith(TestLoggerExtension.class) +public class BatchSQLTest { +private static final Logger LOG = LoggerFactory.getLogger(BatchSQLTest.class); + +private static final Path sqlPath = +ResourceTestUtils.getResource("resources/sql-job-query.sql"); + +@TempDir private Path tmp; + +@RegisterExtension +private static final MiniClusterExtension MINI_CLUSTER = +new MiniClusterExtension( +new MiniClusterResourceConfiguration.Builder() +.setNumberTaskManagers(2) +.setNumberSlotsPerTaskManager(1) +.build()); + +private Path result; + +@BeforeEach +public void before() { +this.result = tmp.resolve(String.format("result-%s", UUID.randomUUID())); +LOG.info("Results for this test will be stored at: {}", this.result); +} + +@ParameterizedTest +@EnumSource( +value = BatchShuffleMode.class, +names = { +"ALL_EXCHANGES_BLOCKING", Review Comment: Not an expert either, but I tried and I get an `IllegalState`: > At the moment, adaptive batch scheduler requires batch workloads to be executed with types of all edges being BLOCKING or HYBRID_FULL/HYBRID_SELECTIVE. To do that, you need to configure 'execution.batch-shuffle-mode' to 'ALL_EXCHANGES_BLOCKING' or 'ALL_EXCHANGES_HYBRID_FULL/ALL_EXCHANGES_HYBRID_SELECTIVE'. Note that for DataSet jobs which do not recognize the aforementioned shuffle mode, the ExecutionMode needs to be BATCH_FORCED to force BLOCKING shuffle -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]
XComp commented on code in PR #24471: URL: https://github.com/apache/flink/pull/24471#discussion_r1541160078 ## flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/BatchSQLTest.java: ## @@ -34,66 +35,105 @@ import org.apache.flink.table.sinks.CsvTableSink; import org.apache.flink.table.sources.InputFormatTableSource; import org.apache.flink.table.types.DataType; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.test.resources.ResourceTestUtils; import org.apache.flink.types.Row; +import org.apache.flink.util.TestLoggerExtension; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.Serializable; +import java.nio.file.Files; +import java.nio.file.Path; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneOffset; import java.util.Iterator; import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.UUID; -/** - * End-to-end test for batch SQL queries. - * - * The sources are generated and bounded. The result is always constant. - * - * Parameters: -outputPath output file path for CsvTableSink; -sqlStatement SQL statement that - * will be executed as executeSql - */ -public class BatchSQLTestProgram { +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** End-to-End tests for Batch SQL tests. */ +@ExtendWith(TestLoggerExtension.class) +public class BatchSQLTest { Review Comment: ```suggestion class BatchSQLTest { ``` Junit5 allows for test classes to be package-protected. This will enable you to remove the JavaDoc and still comply to checkstyle. The JavaDoc itself doesn't add much value. ## flink-end-to-end-tests/flink-batch-sql-test/src/test/resources/log4j2-test.properties: ## @@ -0,0 +1,31 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +rootLogger.level=OFF +rootLogger.appenderRef.test.ref=TestLogger +appender.testlogger.name=TestLogger +appender.testlogger.type=CONSOLE +appender.testlogger.target=SYSTEM_ERR +appender.testlogger.layout.type=PatternLayout +appender.testlogger.layout.pattern=%-4r [%t] %-5p %c %x - %m%n +# Uncomment to enable codegen logging +#loggers = testlogger +#logger.testlogger.name =org.apache.flink.table.planner.codegen +#logger.testlogger.level = TRACE +#logger.testlogger.appenderRefs = TestLogger Review Comment: ```suggestion # Set root logger level to OFF to not flood build logs # set manually to INFO for debugging purposes rootLogger.level=OFF rootLogger.appenderRef.test.ref=TestLogger appender.testlogger.name=TestLogger appender.testlogger.type=CONSOLE appender.testlogger.target=SYSTEM_ERR appender.testlogger.layout.type=PatternLayout appender.testlogger.layout.pattern=%-4r [%t] %-5p %c %x - %m%n # Uncomment to enable codegen logging #loggers = testlogger #logger.testlogger.name =org.apache.flink.table.planner.codegen #logger.testlogger.level = TRACE #logger.testlogger.appenderRefs = TestLogger ``` Then let's add some empty lines in between ## flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/BatchSQLTest.java: ## @@ -34,66 +35,105 @@ import org.apache.flink.table.sinks.CsvTableSink; import org.apache.flink.table.sources.InputFormatTableSource; import org.apache.flink.table.types.DataType; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.test.resources.ResourceTestUtils; import org.apache.flink.types.Row; +import org.apache.flink.util.TestLoggerExtension; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir;
Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]
affo commented on PR #24471: URL: https://github.com/apache/flink/pull/24471#issuecomment-2018234458 @XComp everything should be ok now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]
affo commented on PR #24471: URL: https://github.com/apache/flink/pull/24471#issuecomment-2007495060 @XComp Required quite of an effort honestly, but here we are with the JUnit5 version of what I had before This also allowed not to start a separate jar, but to directly include the code in the text and directly run it agains the MiniCluster obtained Thank you for your detailed review -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]
affo commented on code in PR #24471: URL: https://github.com/apache/flink/pull/24471#discussion_r1530613302 ## flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java: ## @@ -234,10 +234,7 @@ public JobID submitJob(final JobSubmission jobSubmission, Duration timeout) thro LOG.info("Running {}.", commands.stream().collect(Collectors.joining(" "))); -final Pattern pattern = -jobSubmission.isDetached() -? Pattern.compile("Job has been submitted with JobID (.*)") -: Pattern.compile("Job with JobID (.*) has finished."); +final Pattern pattern = Pattern.compile("Job has been submitted with JobID (.*)"); Review Comment: It used to be, as apparently that string matching is now obsolete. With latest changes this is not relevant anymore -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]
affo commented on code in PR #24471: URL: https://github.com/apache/flink/pull/24471#discussion_r1530612428 ## flink-end-to-end-tests/flink-batch-sql-test/src/test/resources/log4j2-test.properties: ## @@ -0,0 +1,31 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +rootLogger.level=OFF +rootLogger.appenderRef.test.ref=TestLogger +appender.testlogger.name=TestLogger +appender.testlogger.type=CONSOLE +appender.testlogger.target=SYSTEM_ERR +appender.testlogger.layout.type=PatternLayout +appender.testlogger.layout.pattern=%-4r [%t] %-5p %c %x - %m%n +# Uncomment to enable codegen logging +#loggers = testlogger +#logger.testlogger.name =org.apache.flink.table.planner.codegen +#logger.testlogger.level = TRACE +#logger.testlogger.appenderRefs = TestLogger Review Comment: This is quite useful for running tests (switch from `OFF` to `INFO`) and `OFF` prevents to clutter logs in CI runs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]
affo commented on PR #24471: URL: https://github.com/apache/flink/pull/24471#issuecomment-2003878871 @XComp thank you for your review, gonna address the feedback today (as I had a week off) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]
XComp commented on code in PR #24471: URL: https://github.com/apache/flink/pull/24471#discussion_r1525110715 ## flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/BatchSQLTest.java: ## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.tests; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.test.resources.ResourceTestUtils; +import org.apache.flink.test.util.JobSubmission; +import org.apache.flink.tests.util.flink.ClusterController; +import org.apache.flink.tests.util.flink.FlinkResource; +import org.apache.flink.tests.util.flink.FlinkResourceSetup; +import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResourceFactory; +import org.apache.flink.util.TestLogger; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; +import java.util.UUID; + +import static org.junit.jupiter.api.Assertions.assertEquals; Review Comment: Your mixing up JUnit5 and JUnit4 classes/methods. The community agreed to move forward with JUnit5/Assertj for new tests. It's therefore advised to stick to this dependencies for newly added tests. ## flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/BatchSQLTest.java: ## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.tests; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.test.resources.ResourceTestUtils; +import org.apache.flink.test.util.JobSubmission; +import org.apache.flink.tests.util.flink.ClusterController; +import org.apache.flink.tests.util.flink.FlinkResource; +import org.apache.flink.tests.util.flink.FlinkResourceSetup; +import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResourceFactory; +import org.apache.flink.util.TestLogger; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; +import java.util.UUID; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** End-to-End tests for Batch SQL tests. */ +@RunWith(Parameterized.class) +public class BatchSQLTest extends TestLogger { +private static final Logger LOG = LoggerFactory.getLogger(BatchSQLTest.class); +private static final Path testJar = ResourceTestUtils.getResource("/BatchSQLTestProgram.jar"); +private static final String testMainClass = "org.apache.flink.sql.tests.BatchSQLTestProgram"; +private static final Path sqlPath = +ResourceTestUtils.getResource("resources/sql-job-query.sql"); + +@Parameterized.Parameters(name = "shuffleType[{0}]") +public static Collection shuffleTypes() { +return Arrays.asList("blocking", "hybrid_full", "hybrid_selective"); +} + +@Rule public final
Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]
affo commented on PR #24471: URL: https://github.com/apache/flink/pull/24471#issuecomment-1988639429 @wuchong @XComp Hi guys, here is the PR for https://issues.apache.org/jira/browse/FLINK-20398. I decided to go for `LocalStandaloneFlinkResourceFactory` as it is already used and part of `flink-end-to-end-tests-common` other options were: - `MiniCluster` - `FlinkContainers` (testcontainers-based) Just tell me if you would rather see one of those implementations for some reason Important concern: This test used to be part of `run-nightly-tests.sh`, now I think it would run differently 樂 Should it still run nightly? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]
flinkbot commented on PR #24471: URL: https://github.com/apache/flink/pull/24471#issuecomment-1985868527 ## CI report: * 35acd12f6acdd8c132f3dbc365c5c24549b7bc18 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]
affo opened a new pull request, #24471: URL: https://github.com/apache/flink/pull/24471 ## What is the purpose of the change Migrate `test_batch_sql.sh` to end-to-end test frameworks. ## Brief change log - implement `BatchSQLTest` porting `test_batch_sql.sh` - fix issue in getting job ID in `FlinkDistribution` - remove `test_batch_sql.sh` script - remove `test_batch_sql.sh` invocations from `run-nightly-tests.sh` Question: should `run-nightly-tests.sh` now run this Java version? ## Verifying this change This change added tests and can be verified as follows: - Added integration tests for end-to-end batch mode SQL query execution ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? NA -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org