Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]

2024-06-17 Thread via GitHub


affo commented on PR #24471:
URL: https://github.com/apache/flink/pull/24471#issuecomment-2172535386

   > Yeah, the test failure is unrelated 
([FLINK-35042](https://issues.apache.org/jira/browse/FLINK-35042)). 
Unfortunately, we missed to get it in before the feature freeze. Let's merge it 
after the release branch for 1.20 is created.
   
   Ah snap, gonna stay tuned on this then  
   I will create the JIRA for the "on-the-fly" processing after merging as 
stated above  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]

2024-06-16 Thread via GitHub


XComp commented on PR #24471:
URL: https://github.com/apache/flink/pull/24471#issuecomment-2171654110

   Yeah, the test failure is unrelated (FLINK-35042). Unfortunately, we missed 
to get it in before the feature freeze. Let's merge it after the release branch 
for 1.20 is created.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]

2024-06-13 Thread via GitHub


affo commented on PR #24471:
URL: https://github.com/apache/flink/pull/24471#issuecomment-2165511898

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]

2024-06-13 Thread via GitHub


JingGe commented on code in PR #24471:
URL: https://github.com/apache/flink/pull/24471#discussion_r1637750136


##
flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java:
##
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.tests;
+
+import org.apache.flink.connector.testframe.source.FromElementsSource;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.Row;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.List;
+
+class Generator implements FromElementsSource.ElementsSupplier {
+private static final long serialVersionUID = -8455653458083514261L;
+private final List elements;
+
+static Generator create(
+int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int 
offsetSeconds) {
+final int stepMs = (int) (1000 / rowsPerKeyAndSecond);
+final long durationMs = durationSeconds * 1000L;
+final long offsetMs = offsetSeconds * 2000L;
+final List elements = new ArrayList<>();
+int keyIndex = 0;
+long ms = 0;
+while (ms < durationMs) {
+elements.add(createRow(keyIndex++, ms, offsetMs));

Review Comment:
   I still think we should keep the on-the-fly implementation. But we can move 
forward for now and modify it once we have the performance issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]

2024-06-10 Thread via GitHub


XComp commented on code in PR #24471:
URL: https://github.com/apache/flink/pull/24471#discussion_r1632637479


##
flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java:
##
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.tests;
+
+import org.apache.flink.connector.testframe.source.FromElementsSource;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.Row;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.List;
+
+class Generator implements FromElementsSource.ElementsSupplier {
+private static final long serialVersionUID = -8455653458083514261L;
+private final List elements;
+
+static Generator create(
+int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int 
offsetSeconds) {
+final int stepMs = (int) (1000 / rowsPerKeyAndSecond);
+final long durationMs = durationSeconds * 1000L;
+final long offsetMs = offsetSeconds * 2000L;
+final List elements = new ArrayList<>();
+int keyIndex = 0;
+long ms = 0;
+while (ms < durationMs) {
+elements.add(createRow(keyIndex++, ms, offsetMs));

Review Comment:
   @JingGe any objections? The refactoring should be ok considering that the 
amount of data involved is quite low. The actual migration from bash to Java is 
also done in a separate commit which enables us to revert if we feel it's 
necessary. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]

2024-06-05 Thread via GitHub


affo commented on code in PR #24471:
URL: https://github.com/apache/flink/pull/24471#discussion_r1627696194


##
flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java:
##
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.tests;
+
+import org.apache.flink.connector.testframe.source.FromElementsSource;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.Row;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.List;
+
+class Generator implements FromElementsSource.ElementsSupplier {
+private static final long serialVersionUID = -8455653458083514261L;
+private final List elements;
+
+static Generator create(
+int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int 
offsetSeconds) {
+final int stepMs = (int) (1000 / rowsPerKeyAndSecond);
+final long durationMs = durationSeconds * 1000L;
+final long offsetMs = offsetSeconds * 2000L;
+final List elements = new ArrayList<>();
+int keyIndex = 0;
+long ms = 0;
+while (ms < durationMs) {
+elements.add(createRow(keyIndex++, ms, offsetMs));

Review Comment:
   @JingGe @XComp Thank you for the feedback!
   
   @XComp I would merge this is as it is.
   
   In the brackground I was already working on something similar, I will create 
another issue for adding a test source for batch tests and for Table API



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]

2024-06-05 Thread via GitHub


XComp commented on code in PR #24471:
URL: https://github.com/apache/flink/pull/24471#discussion_r1627611020


##
flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java:
##
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.tests;
+
+import org.apache.flink.connector.testframe.source.FromElementsSource;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.Row;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.List;
+
+class Generator implements FromElementsSource.ElementsSupplier {
+private static final long serialVersionUID = -8455653458083514261L;
+private final List elements;
+
+static Generator create(
+int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int 
offsetSeconds) {
+final int stepMs = (int) (1000 / rowsPerKeyAndSecond);
+final long durationMs = durationSeconds * 1000L;
+final long offsetMs = offsetSeconds * 2000L;
+final List elements = new ArrayList<>();
+int keyIndex = 0;
+long ms = 0;
+while (ms < durationMs) {
+elements.add(createRow(keyIndex++, ms, offsetMs));

Review Comment:
   > Got it, the number of records is not huge, that's why I did not mention 
that 
   
   True - that's a valid point. I didn't check the number of elements as part 
of my last comment. I leave the decision up to you whether it's done in a new 
PR or part of this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]

2024-06-05 Thread via GitHub


JingGe commented on code in PR #24471:
URL: https://github.com/apache/flink/pull/24471#discussion_r1627390009


##
flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java:
##
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.tests;
+
+import org.apache.flink.connector.testframe.source.FromElementsSource;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.Row;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.List;
+
+class Generator implements FromElementsSource.ElementsSupplier {
+private static final long serialVersionUID = -8455653458083514261L;
+private final List elements;
+
+static Generator create(
+int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int 
offsetSeconds) {
+final int stepMs = (int) (1000 / rowsPerKeyAndSecond);
+final long durationMs = durationSeconds * 1000L;
+final long offsetMs = offsetSeconds * 2000L;
+final List elements = new ArrayList<>();
+int keyIndex = 0;
+long ms = 0;
+while (ms < durationMs) {
+elements.add(createRow(keyIndex++, ms, offsetMs));

Review Comment:
   Sounds great! Please feel free to create a follow up ticket and contribute 
the new generator with a new PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]

2024-06-05 Thread via GitHub


affo commented on code in PR #24471:
URL: https://github.com/apache/flink/pull/24471#discussion_r1627196144


##
flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java:
##
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.tests;
+
+import org.apache.flink.connector.testframe.source.FromElementsSource;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.Row;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.List;
+
+class Generator implements FromElementsSource.ElementsSupplier {
+private static final long serialVersionUID = -8455653458083514261L;
+private final List elements;
+
+static Generator create(
+int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int 
offsetSeconds) {
+final int stepMs = (int) (1000 / rowsPerKeyAndSecond);
+final long durationMs = durationSeconds * 1000L;
+final long offsetMs = offsetSeconds * 2000L;
+final List elements = new ArrayList<>();
+int keyIndex = 0;
+long ms = 0;
+while (ms < durationMs) {
+elements.add(createRow(keyIndex++, ms, offsetMs));

Review Comment:
   @JingGe @XComp 
   
   Got it, the number of records is not huge, that's why I did not mention that 
 
   However, I understand your concerns as well  
   
   At this point I would write another generator as part of this PR.
   
   However, I would provide that as part of test-utils rather than only 
confined to batch as other tests could benefit from that.
   
   What do you guys think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]

2024-06-04 Thread via GitHub


XComp commented on code in PR #24471:
URL: https://github.com/apache/flink/pull/24471#discussion_r1626077906


##
flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java:
##
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.tests;
+
+import org.apache.flink.connector.testframe.source.FromElementsSource;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.Row;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.List;
+
+class Generator implements FromElementsSource.ElementsSupplier {
+private static final long serialVersionUID = -8455653458083514261L;
+private final List elements;
+
+static Generator create(
+int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int 
offsetSeconds) {
+final int stepMs = (int) (1000 / rowsPerKeyAndSecond);
+final long durationMs = durationSeconds * 1000L;
+final long offsetMs = offsetSeconds * 2000L;
+final List elements = new ArrayList<>();
+int keyIndex = 0;
+long ms = 0;
+while (ms < durationMs) {
+elements.add(createRow(keyIndex++, ms, offsetMs));

Review Comment:
   Jing actually has a good point on the memory consumption. I missed that one. 
 We should continue generating the records on-the-fly to be closer to what the 
original test did.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]

2024-06-04 Thread via GitHub


JingGe commented on code in PR #24471:
URL: https://github.com/apache/flink/pull/24471#discussion_r1624540081


##
flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java:
##
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.tests;
+
+import org.apache.flink.connector.testframe.source.FromElementsSource;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.Row;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.List;
+
+class Generator implements FromElementsSource.ElementsSupplier {
+private static final long serialVersionUID = -8455653458083514261L;
+private final List elements;
+
+static Generator create(
+int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int 
offsetSeconds) {
+final int stepMs = (int) (1000 / rowsPerKeyAndSecond);
+final long durationMs = durationSeconds * 1000L;
+final long offsetMs = offsetSeconds * 2000L;
+final List elements = new ArrayList<>();
+int keyIndex = 0;
+long ms = 0;
+while (ms < durationMs) {
+elements.add(createRow(keyIndex++, ms, offsetMs));

Review Comment:
   The original implementation was only for batch in `BatchSQLTestProgram`. 
This PR is for the migration that should not implicitly bring big change for 
the data generation that might cause performance issue later. In addition, the 
new implementation is still in the `flink-batch-sql-test` module which should 
be used only for batch. 
   Not sure if there are already similar generators in the stream-sql-test. If 
not, a new jira task could be created and add the new implementation to the 
stream sql test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]

2024-06-04 Thread via GitHub


JingGe commented on code in PR #24471:
URL: https://github.com/apache/flink/pull/24471#discussion_r1624540081


##
flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java:
##
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.tests;
+
+import org.apache.flink.connector.testframe.source.FromElementsSource;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.Row;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.List;
+
+class Generator implements FromElementsSource.ElementsSupplier {
+private static final long serialVersionUID = -8455653458083514261L;
+private final List elements;
+
+static Generator create(
+int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int 
offsetSeconds) {
+final int stepMs = (int) (1000 / rowsPerKeyAndSecond);
+final long durationMs = durationSeconds * 1000L;
+final long offsetMs = offsetSeconds * 2000L;
+final List elements = new ArrayList<>();
+int keyIndex = 0;
+long ms = 0;
+while (ms < durationMs) {
+elements.add(createRow(keyIndex++, ms, offsetMs));

Review Comment:
   The original implementation was only for batch in `BatchSQLTestProgram`. 
This PR is for the migration that should not implicitly bring big change for 
the data generation that might cause performance issue later. In addition, the 
new implementation is still in the `flink-batch-sql-test` module which should 
be used only for batch. 
   Not sure if there are already similar generators in the stream-sql-test. If 
not, a new jira task could be created and add the new generator implementation 
to the stream sql test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]

2024-06-04 Thread via GitHub


JingGe commented on code in PR #24471:
URL: https://github.com/apache/flink/pull/24471#discussion_r1624540081


##
flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java:
##
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.tests;
+
+import org.apache.flink.connector.testframe.source.FromElementsSource;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.Row;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.List;
+
+class Generator implements FromElementsSource.ElementsSupplier {
+private static final long serialVersionUID = -8455653458083514261L;
+private final List elements;
+
+static Generator create(
+int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int 
offsetSeconds) {
+final int stepMs = (int) (1000 / rowsPerKeyAndSecond);
+final long durationMs = durationSeconds * 1000L;
+final long offsetMs = offsetSeconds * 2000L;
+final List elements = new ArrayList<>();
+int keyIndex = 0;
+long ms = 0;
+while (ms < durationMs) {
+elements.add(createRow(keyIndex++, ms, offsetMs));

Review Comment:
   The original implementation was only for batch in `BatchSQLTestProgram`. 
This PR is for the migration that should not implicitly bring big change for 
the data generation that might cause performance issue later. In addition, the 
new implementation is still in the `flink-batch-sql-test` module which should 
be used only for batch. 
   Not sure if there are already similar generators in the stream-sql-test. If 
not, a new jira task could created and add the new implementation to the stream 
sql test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]

2024-06-03 Thread via GitHub


JingGe commented on code in PR #24471:
URL: https://github.com/apache/flink/pull/24471#discussion_r1624540081


##
flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java:
##
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.tests;
+
+import org.apache.flink.connector.testframe.source.FromElementsSource;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.Row;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.List;
+
+class Generator implements FromElementsSource.ElementsSupplier {
+private static final long serialVersionUID = -8455653458083514261L;
+private final List elements;
+
+static Generator create(
+int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int 
offsetSeconds) {
+final int stepMs = (int) (1000 / rowsPerKeyAndSecond);
+final long durationMs = durationSeconds * 1000L;
+final long offsetMs = offsetSeconds * 2000L;
+final List elements = new ArrayList<>();
+int keyIndex = 0;
+long ms = 0;
+while (ms < durationMs) {
+elements.add(createRow(keyIndex++, ms, offsetMs));

Review Comment:
   The original implementation was only for batch in `BatchSQLTestProgram`. 
This PR is for the migration that should not implicitly bring big change for 
the data generation that might cause performance issue later. In addition, the 
new implementation is still in the `flink-batch-sql-test` module which should 
be used only for batch. 
   No sure if there are already similar generators in the stream-sql-test. If 
not, a new jira task could created and add the new implementation to the stream 
sql test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]

2024-06-03 Thread via GitHub


affo commented on code in PR #24471:
URL: https://github.com/apache/flink/pull/24471#discussion_r1624446134


##
flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java:
##
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.tests;
+
+import org.apache.flink.connector.testframe.source.FromElementsSource;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.Row;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.List;
+
+class Generator implements FromElementsSource.ElementsSupplier {
+private static final long serialVersionUID = -8455653458083514261L;
+private final List elements;
+
+static Generator create(
+int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int 
offsetSeconds) {
+final int stepMs = (int) (1000 / rowsPerKeyAndSecond);
+final long durationMs = durationSeconds * 1000L;
+final long offsetMs = offsetSeconds * 2000L;
+final List elements = new ArrayList<>();
+int keyIndex = 0;
+long ms = 0;
+while (ms < durationMs) {
+elements.add(createRow(keyIndex++, ms, offsetMs));

Review Comment:
   In other words, I could have another implementation of the bounded source 
without any fault-tolerance guarantee  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]

2024-06-03 Thread via GitHub


affo commented on code in PR #24471:
URL: https://github.com/apache/flink/pull/24471#discussion_r1624443303


##
flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java:
##
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.tests;
+
+import org.apache.flink.connector.testframe.source.FromElementsSource;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.Row;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.List;
+
+class Generator implements FromElementsSource.ElementsSupplier {
+private static final long serialVersionUID = -8455653458083514261L;
+private final List elements;
+
+static Generator create(
+int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int 
offsetSeconds) {
+final int stepMs = (int) (1000 / rowsPerKeyAndSecond);
+final long durationMs = durationSeconds * 1000L;
+final long offsetMs = offsetSeconds * 2000L;
+final List elements = new ArrayList<>();
+int keyIndex = 0;
+long ms = 0;
+while (ms < durationMs) {
+elements.add(createRow(keyIndex++, ms, offsetMs));

Review Comment:
   @JingGe yeah, nothing related strictly to this case.
   
   The `FromElementsSource` is actually generic and can be used and is used in 
the streaming case.
   
   Here I am using it in batch table mode.
   
   I am just reusing that as it is possible  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]

2024-06-03 Thread via GitHub


JingGe commented on code in PR #24471:
URL: https://github.com/apache/flink/pull/24471#discussion_r1624191535


##
flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java:
##
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.tests;
+
+import org.apache.flink.connector.testframe.source.FromElementsSource;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.Row;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.List;
+
+class Generator implements FromElementsSource.ElementsSupplier {
+private static final long serialVersionUID = -8455653458083514261L;
+private final List elements;
+
+static Generator create(
+int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int 
offsetSeconds) {
+final int stepMs = (int) (1000 / rowsPerKeyAndSecond);
+final long durationMs = durationSeconds * 1000L;
+final long offsetMs = offsetSeconds * 2000L;
+final List elements = new ArrayList<>();
+int keyIndex = 0;
+long ms = 0;
+while (ms < durationMs) {
+elements.add(createRow(keyIndex++, ms, offsetMs));

Review Comment:
   Thanks for the reply. Commonly, batch processing does not rely on offset. 
Would you please help me understand why the source should be fault-tolerant and 
requires getting record by offset for batch? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]

2024-06-03 Thread via GitHub


JingGe commented on code in PR #24471:
URL: https://github.com/apache/flink/pull/24471#discussion_r1624191535


##
flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java:
##
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.tests;
+
+import org.apache.flink.connector.testframe.source.FromElementsSource;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.Row;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.List;
+
+class Generator implements FromElementsSource.ElementsSupplier {
+private static final long serialVersionUID = -8455653458083514261L;
+private final List elements;
+
+static Generator create(
+int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int 
offsetSeconds) {
+final int stepMs = (int) (1000 / rowsPerKeyAndSecond);
+final long durationMs = durationSeconds * 1000L;
+final long offsetMs = offsetSeconds * 2000L;
+final List elements = new ArrayList<>();
+int keyIndex = 0;
+long ms = 0;
+while (ms < durationMs) {
+elements.add(createRow(keyIndex++, ms, offsetMs));

Review Comment:
   Thanks for the reply. Batch processing does not rely on offset commonly. 
Would you please help me understand why the source should be fault-tolerant and 
requires getting record by offset for batch? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]

2024-05-31 Thread via GitHub


JingGe commented on code in PR #24471:
URL: https://github.com/apache/flink/pull/24471#discussion_r1622617735


##
flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java:
##
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.tests;
+
+import org.apache.flink.connector.testframe.source.FromElementsSource;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.Row;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.List;
+
+class Generator implements FromElementsSource.ElementsSupplier {
+private static final long serialVersionUID = -8455653458083514261L;
+private final List elements;
+
+static Generator create(
+int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int 
offsetSeconds) {
+final int stepMs = (int) (1000 / rowsPerKeyAndSecond);
+final long durationMs = durationSeconds * 1000L;
+final long offsetMs = offsetSeconds * 2000L;
+final List elements = new ArrayList<>();
+int keyIndex = 0;
+long ms = 0;
+while (ms < durationMs) {
+elements.add(createRow(keyIndex++, ms, offsetMs));

Review Comment:
   The new implementation will consume more memory than the old one which will 
generate `row` iteratively on the fly. This could be a potential issue for 
large data volume batch tests. 



##
flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/BatchSQLTest.java:
##
@@ -34,66 +35,105 @@
 import org.apache.flink.table.sinks.CsvTableSink;
 import org.apache.flink.table.sources.InputFormatTableSource;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.test.resources.ResourceTestUtils;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.ZoneOffset;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.UUID;
 
-/**
- * End-to-end test for batch SQL queries.
- *
- * The sources are generated and bounded. The result is always constant.
- *
- * Parameters: -outputPath output file path for CsvTableSink; -sqlStatement 
SQL statement that
- * will be executed as executeSql
- */
-public class BatchSQLTestProgram {
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** End-to-End tests for Batch SQL tests. */
+@ExtendWith(TestLoggerExtension.class)
+public class BatchSQLTest {
+private static final Logger LOG = 
LoggerFactory.getLogger(BatchSQLTest.class);
+
+private static final Path sqlPath =
+ResourceTestUtils.getResource("resources/sql-job-query.sql");
+
+@TempDir private Path tmp;
+
+@RegisterExtension
+private static final MiniClusterExtension MINI_CLUSTER =
+new MiniClusterExtension(
+new MiniClusterResourceConfiguration.Builder()
+.setNumberTaskManagers(2)
+.setNumberSlotsPerTaskManager(1)
+.build());
+
+private Path result;
+
+@BeforeEach
+public void before() {
+this.result = tmp.resolve(String.format("result-%s", 
UUID.randomUUID()));
+LOG.info("Results for this test will be stored at: {}", this.result);
+}
+
+@ParameterizedTest
+@EnumSource(
+

Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]

2024-05-30 Thread via GitHub


affo commented on PR #24471:
URL: https://github.com/apache/flink/pull/24471#issuecomment-2139041576

   @XComp done!
   
   Don't worry in any case, I loved the review process. This is my first 
contribution and this is part of learning for next ones 欄 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]

2024-05-30 Thread via GitHub


XComp commented on PR #24471:
URL: https://github.com/apache/flink/pull/24471#issuecomment-2138886095

   One final thing: I wasn't able to do it myself somehow. Can you change the 
commit message prefix from `[refactor]` to `[FLINK-20398]`? "refactor" isn't a 
prefix the Flink community usually use.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]

2024-05-30 Thread via GitHub


XComp commented on PR #24471:
URL: https://github.com/apache/flink/pull/24471#issuecomment-2138868656

   I'm not gonna wait for another CI round. Looks like the CI bot didn't pick 
up the rerun command. Anyway, I verified that the test ran (see 
[logs](https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59935=logs=af184cdd-c6d8-5084-0b69-7e9c67b35f7a=0f3adb59-eefa-51c6-2858-3654d9e0749d=14026)).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]

2024-05-29 Thread via GitHub


XComp commented on PR #24471:
URL: https://github.com/apache/flink/pull/24471#issuecomment-2137605027

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]

2024-05-29 Thread via GitHub


XComp commented on PR #24471:
URL: https://github.com/apache/flink/pull/24471#issuecomment-2137601891

   CI test failure is unrelated: FLINK-34513


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]

2024-05-29 Thread via GitHub


XComp commented on code in PR #24471:
URL: https://github.com/apache/flink/pull/24471#discussion_r1618480332


##
flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java:
##
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.tests;
+
+import org.apache.flink.connector.testframe.source.FromElementsSource;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.Row;
+
+import org.jetbrains.annotations.NotNull;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+class Generator implements FromElementsSource.ElementsSupplier {
+private static final long serialVersionUID = -8455653458083514261L;
+private final List elements;
+
+static Generator create(
+int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int 
offsetSeconds) {
+int sleepMs = (int) (1000 / rowsPerKeyAndSecond);
+InternalGenerator gen =
+new InternalGenerator(
+numKeys, durationSeconds * 1000L, sleepMs, 
offsetSeconds * 2000L);
+List elements = new ArrayList<>();
+gen.forEachRemaining(elements::add);
+return new Generator(elements);
+}

Review Comment:
   You're right - I got a bit carried away. We shouldn't optimize it any 
further. But let's keep the changes as they are now since you applied it.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]

2024-05-29 Thread via GitHub


affo commented on code in PR #24471:
URL: https://github.com/apache/flink/pull/24471#discussion_r1618451567


##
flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java:
##
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.tests;
+
+import org.apache.flink.connector.testframe.source.FromElementsSource;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.Row;
+
+import org.jetbrains.annotations.NotNull;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+class Generator implements FromElementsSource.ElementsSupplier {
+private static final long serialVersionUID = -8455653458083514261L;
+private final List elements;
+
+static Generator create(
+int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int 
offsetSeconds) {
+int sleepMs = (int) (1000 / rowsPerKeyAndSecond);
+InternalGenerator gen =
+new InternalGenerator(
+numKeys, durationSeconds * 1000L, sleepMs, 
offsetSeconds * 2000L);
+List elements = new ArrayList<>();
+gen.forEachRemaining(elements::add);
+return new Generator(elements);
+}

Review Comment:
   I applied your suggestion as I like it  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]

2024-05-29 Thread via GitHub


affo commented on code in PR #24471:
URL: https://github.com/apache/flink/pull/24471#discussion_r1618443253


##
flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java:
##
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.tests;
+
+import org.apache.flink.connector.testframe.source.FromElementsSource;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.Row;
+
+import org.jetbrains.annotations.NotNull;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+class Generator implements FromElementsSource.ElementsSupplier {
+private static final long serialVersionUID = -8455653458083514261L;
+private final List elements;
+
+static Generator create(
+int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int 
offsetSeconds) {
+int sleepMs = (int) (1000 / rowsPerKeyAndSecond);
+InternalGenerator gen =
+new InternalGenerator(
+numKeys, durationSeconds * 1000L, sleepMs, 
offsetSeconds * 2000L);
+List elements = new ArrayList<>();
+gen.forEachRemaining(elements::add);
+return new Generator(elements);
+}

Review Comment:
   Did not want to edit too much the original logic of the generator itself  
   
   But it makes sense  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]

2024-05-27 Thread via GitHub


XComp commented on code in PR #24471:
URL: https://github.com/apache/flink/pull/24471#discussion_r1616241149


##
flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java:
##
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.tests;
+
+import org.apache.flink.connector.testframe.source.FromElementsSource;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.Row;
+
+import org.jetbrains.annotations.NotNull;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+class Generator implements FromElementsSource.ElementsSupplier {
+private static final long serialVersionUID = -8455653458083514261L;
+private final List elements;
+
+static Generator create(
+int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int 
offsetSeconds) {
+int sleepMs = (int) (1000 / rowsPerKeyAndSecond);
+InternalGenerator gen =
+new InternalGenerator(
+numKeys, durationSeconds * 1000L, sleepMs, 
offsetSeconds * 2000L);
+List elements = new ArrayList<>();
+gen.forEachRemaining(elements::add);
+return new Generator(elements);
+}

Review Comment:
   ```suggestion
   static Generator create(
   int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int 
offsetSeconds) {
   final int stepMs = (int) (1000 / rowsPerKeyAndSecond);
   final long durationMs = durationSeconds * 1000L;
   final long offsetMs = offsetSeconds * 2000L;
   final List elements = new ArrayList<>();
   
   int keyIndex = 0;
   long ms = 0;
   while (ms < durationMs) {
   elements.add(createRow(keyIndex++, ms, offsetMs));
   if (keyIndex >= numKeys) {
   keyIndex = 0;
   ms += stepMs;
   }
   }
   
   return new Generator(elements);
   }
   
   private static Row createRow(int keyIndex, long milliseconds, long 
offsetMillis) {
   return Row.of(
   keyIndex,
   LocalDateTime.ofInstant(
   Instant.ofEpochMilli(milliseconds + offsetMillis), 
ZoneOffset.UTC),
   "Some payload...");
   }
   ```
   nit: what we could also do is to get rid of the `InternalGenerator` class. 
It's just a while loop in the end. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]

2024-05-23 Thread via GitHub


affo commented on PR #24471:
URL: https://github.com/apache/flink/pull/24471#issuecomment-2127135299

   @XComp Hello!
   
   Final touches done and your comments are addressed  
   I added the capability for `FromElementsSource` to accept a 
`ElementsSupplier` at init phase.
   
   The problem for which I had to implement a `Serializable extend RowData` was 
due to the fact that the `FromElementsSource` had a field  `List 
elements`, where `OUT` can also be non-serializable (which is the case of 
`RowData`), so, when the job was starting the operator couldn't be serialized.
   
   I made it accept a `ElementsSupplier extends Serializable` so that it is 
clear that the supplier should be serializable.
   
   In my use case, I simply made the quirk that I preserved the previous 
implementation using `Row` (which is serializable) and just convert it to 
`RowData` on `get`. No class has now `RowData` fields that prevent their 
serializability.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]

2024-05-22 Thread via GitHub


affo commented on code in PR #24471:
URL: https://github.com/apache/flink/pull/24471#discussion_r1609692387


##
flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java:
##
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.tests;
+
+import org.apache.flink.table.data.RowData;
+
+import org.jetbrains.annotations.NotNull;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+class Generator implements Iterator, Iterable {
+final int numKeys;
+
+private int keyIndex = 0;
+
+private final long durationMs;
+private final long stepMs;
+private final long offsetMs;
+private long ms = 0;
+
+static Generator create(
+int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int 
offsetSeconds) {
+int sleepMs = (int) (1000 / rowsPerKeyAndSecond);
+return new Generator(numKeys, durationSeconds * 1000L, sleepMs, 
offsetSeconds * 2000L);
+}
+
+Generator(int numKeys, long durationMs, long stepMs, long offsetMs) {
+this.numKeys = numKeys;
+this.durationMs = durationMs;
+this.stepMs = stepMs;
+this.offsetMs = offsetMs;
+}
+
+@Override
+public boolean hasNext() {
+return ms < durationMs;
+}
+
+@Override
+public RowData next() {
+if (!hasNext()) {
+throw new NoSuchElementException();
+}
+RowData row =
+new GeneratedRow(
+keyIndex,
+LocalDateTime.ofInstant(
+Instant.ofEpochMilli(ms + offsetMs), 
ZoneOffset.UTC),
+"Some payload...");
+++keyIndex;
+if (keyIndex >= numKeys) {
+keyIndex = 0;
+ms += stepMs;
+}
+return row;
+}
+
+@NotNull

Review Comment:
   I think this falls in this case:
   > but can be used in certain cases to override a previous annotation,
   
   As `iterator` overrides:
   
   ```java
   public interface Iterable {
   /**
* Returns an iterator over elements of type {@code T}.
*
* @return an Iterator.
*/
   @NotNull
   Iterator iterator();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]

2024-05-22 Thread via GitHub


affo commented on code in PR #24471:
URL: https://github.com/apache/flink/pull/24471#discussion_r1609692387


##
flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java:
##
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.tests;
+
+import org.apache.flink.table.data.RowData;
+
+import org.jetbrains.annotations.NotNull;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+class Generator implements Iterator, Iterable {
+final int numKeys;
+
+private int keyIndex = 0;
+
+private final long durationMs;
+private final long stepMs;
+private final long offsetMs;
+private long ms = 0;
+
+static Generator create(
+int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int 
offsetSeconds) {
+int sleepMs = (int) (1000 / rowsPerKeyAndSecond);
+return new Generator(numKeys, durationSeconds * 1000L, sleepMs, 
offsetSeconds * 2000L);
+}
+
+Generator(int numKeys, long durationMs, long stepMs, long offsetMs) {
+this.numKeys = numKeys;
+this.durationMs = durationMs;
+this.stepMs = stepMs;
+this.offsetMs = offsetMs;
+}
+
+@Override
+public boolean hasNext() {
+return ms < durationMs;
+}
+
+@Override
+public RowData next() {
+if (!hasNext()) {
+throw new NoSuchElementException();
+}
+RowData row =
+new GeneratedRow(
+keyIndex,
+LocalDateTime.ofInstant(
+Instant.ofEpochMilli(ms + offsetMs), 
ZoneOffset.UTC),
+"Some payload...");
+++keyIndex;
+if (keyIndex >= numKeys) {
+keyIndex = 0;
+ms += stepMs;
+}
+return row;
+}
+
+@NotNull

Review Comment:
   I think this falls in this case:
   > but can be used in certain cases to override a previous annotation,
   
   As `iterator` overrides:
   
   ```java
   public interface Iterable {
   /**
* Returns an iterator over elements of type {@code T}.
*
* @return an Iterator.
*/
   Iterator iterator();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]

2024-05-22 Thread via GitHub


affo commented on code in PR #24471:
URL: https://github.com/apache/flink/pull/24471#discussion_r1609688204


##
flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/GeneratedRow.java:
##
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.tests;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.RowKind;
+
+import java.io.Serializable;
+import java.time.LocalDateTime;
+import java.util.Objects;
+
+class GeneratedRow implements RowData, Serializable {

Review Comment:
   The problem is that, in order to avoid using deprecated function and the 
alike, you need to use `static SourceProvider of(Source source, 
@Nullable Integer sourceParallelism)` which forces you to use `RowData`. 
Anyhow, I am going to dive deeper into this and see if this is the only solution



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]

2024-05-15 Thread via GitHub


XComp commented on code in PR #24471:
URL: https://github.com/apache/flink/pull/24471#discussion_r1600075906


##
flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/GeneratedRow.java:
##
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.tests;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.RowKind;
+
+import java.io.Serializable;
+import java.time.LocalDateTime;
+import java.util.Objects;
+
+class GeneratedRow implements RowData, Serializable {

Review Comment:
   Was there a reason to implement a dedicated `RowData`subclass rather than 
using the `Row`class like it's done in the original test code? 樂 



##
flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java:
##
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.tests;
+
+import org.apache.flink.table.data.RowData;
+
+import org.jetbrains.annotations.NotNull;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+class Generator implements Iterator, Iterable {
+final int numKeys;

Review Comment:
   ```suggestion
   private final int numKeys;
   ```
   There is no real reason to make this field a package-private one, does it? 樂 



##
flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java:
##
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.tests;
+
+import org.apache.flink.table.data.RowData;
+
+import org.jetbrains.annotations.NotNull;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+class Generator implements Iterator, Iterable {
+final int numKeys;
+
+private int keyIndex = 0;
+
+private final long durationMs;
+private final long stepMs;
+private final long offsetMs;
+private long ms = 0;
+
+static Generator create(
+int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int 
offsetSeconds) {
+int sleepMs = (int) (1000 / rowsPerKeyAndSecond);
+return 

Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]

2024-04-12 Thread via GitHub


morazow commented on code in PR #24471:
URL: https://github.com/apache/flink/pull/24471#discussion_r1563396268


##
flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/BatchSQLTest.java:
##
@@ -114,104 +129,24 @@ public void testBatchSQL(BatchShuffleMode shuffleMode, 
@TempDir Path tmpDir) thr
 jobClient.getJobExecutionResult().get();
 
 final String expected =
-"1980,1970-01-01 00:00:00.0\n"
-+ "1980,1970-01-01 00:00:20.0\n"
-+ "1980,1970-01-01 00:00:40.0\n";
+"1980,1970-01-01 00:00:00\n"

Review Comment:
   Got it  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]

2024-04-12 Thread via GitHub


affo commented on code in PR #24471:
URL: https://github.com/apache/flink/pull/24471#discussion_r1562170708


##
flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java:
##
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.tests;
+
+import org.apache.flink.table.data.RowData;
+
+import org.jetbrains.annotations.NotNull;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+class Generator implements Iterator, Iterable {
+final int numKeys;
+
+private int keyIndex = 0;
+
+private final long durationMs;
+private final long stepMs;
+private final long offsetMs;
+private long ms = 0;
+
+static Generator create(
+int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int 
offsetSeconds) {
+int sleepMs = (int) (1000 / rowsPerKeyAndSecond);
+return new Generator(numKeys, durationSeconds * 1000L, sleepMs, 
offsetSeconds * 2000L);
+}
+
+Generator(int numKeys, long durationMs, long stepMs, long offsetMs) {
+this.numKeys = numKeys;
+this.durationMs = durationMs;
+this.stepMs = stepMs;
+this.offsetMs = offsetMs;
+}
+
+@Override
+public boolean hasNext() {
+return ms < durationMs;
+}
+
+@Override
+public RowData next() {
+if (!hasNext()) {
+throw new NoSuchElementException();
+}
+RowData row =
+new GeneratedRow(
+keyIndex,
+LocalDateTime.ofInstant(
+Instant.ofEpochMilli(ms + offsetMs), 
ZoneOffset.UTC),
+"Some payload...");

Review Comment:
   The goal of this PR is to port the test.
   
   I preserved its logic 100%, so, I left the record generation as it was in 
the original code  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]

2024-04-12 Thread via GitHub


affo commented on code in PR #24471:
URL: https://github.com/apache/flink/pull/24471#discussion_r1562169902


##
flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/BatchSQLTest.java:
##
@@ -114,104 +129,24 @@ public void testBatchSQL(BatchShuffleMode shuffleMode, 
@TempDir Path tmpDir) thr
 jobClient.getJobExecutionResult().get();
 
 final String expected =
-"1980,1970-01-01 00:00:00.0\n"
-+ "1980,1970-01-01 00:00:20.0\n"
-+ "1980,1970-01-01 00:00:40.0\n";
+"1980,1970-01-01 00:00:00\n"

Review Comment:
   Hello! 
   Yeah, the previous `CsvTableSink` is deprecated, I substitute it with the 
`filesystem` connector.
   Simply I had to edit the expected result for some formatting internal of 
timestamp.
   I just removed `.0` at the end, the result is, after all, the exact same.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]

2024-04-11 Thread via GitHub


morazow commented on code in PR #24471:
URL: https://github.com/apache/flink/pull/24471#discussion_r1561946899


##
flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/Generator.java:
##
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.tests;
+
+import org.apache.flink.table.data.RowData;
+
+import org.jetbrains.annotations.NotNull;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+class Generator implements Iterator, Iterable {
+final int numKeys;
+
+private int keyIndex = 0;
+
+private final long durationMs;
+private final long stepMs;
+private final long offsetMs;
+private long ms = 0;
+
+static Generator create(
+int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int 
offsetSeconds) {
+int sleepMs = (int) (1000 / rowsPerKeyAndSecond);
+return new Generator(numKeys, durationSeconds * 1000L, sleepMs, 
offsetSeconds * 2000L);
+}
+
+Generator(int numKeys, long durationMs, long stepMs, long offsetMs) {
+this.numKeys = numKeys;
+this.durationMs = durationMs;
+this.stepMs = stepMs;
+this.offsetMs = offsetMs;
+}
+
+@Override
+public boolean hasNext() {
+return ms < durationMs;
+}
+
+@Override
+public RowData next() {
+if (!hasNext()) {
+throw new NoSuchElementException();
+}
+RowData row =
+new GeneratedRow(
+keyIndex,
+LocalDateTime.ofInstant(
+Instant.ofEpochMilli(ms + offsetMs), 
ZoneOffset.UTC),
+"Some payload...");

Review Comment:
   I saw this was in the original change, but should we randomize the payload?



##
flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/BatchSQLTest.java:
##
@@ -114,104 +129,24 @@ public void testBatchSQL(BatchShuffleMode shuffleMode, 
@TempDir Path tmpDir) thr
 jobClient.getJobExecutionResult().get();
 
 final String expected =
-"1980,1970-01-01 00:00:00.0\n"
-+ "1980,1970-01-01 00:00:20.0\n"
-+ "1980,1970-01-01 00:00:40.0\n";
+"1980,1970-01-01 00:00:00\n"

Review Comment:
   Why is this change is required?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]

2024-04-11 Thread via GitHub


affo commented on PR #24471:
URL: https://github.com/apache/flink/pull/24471#issuecomment-2049852124

   @XComp Glad for your vacation!
   Finally I also addressed the deprecation warnings and went through the 
implementation of a custom connector through `DynamicTableSource`.
   
   It turned out to be quite tough, as probably it is not that common, or these 
new APIs are not super-well documented for now.
   
   I wanted to use `TableEnvironment.fromValues` however, I could not use it as 
the test was hanging...
   I want to understand why and, in case, file an issue for that.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]

2024-04-03 Thread via GitHub


XComp commented on PR #24471:
URL: https://github.com/apache/flink/pull/24471#issuecomment-2033738415

   fyi: I will be off for the rest of April and, therefore, wouldn't be able to 
finalize this PR. You might want to reach out to other committers or expect a 
delay in my responses.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]

2024-03-28 Thread via GitHub


affo commented on code in PR #24471:
URL: https://github.com/apache/flink/pull/24471#discussion_r1543172250


##
flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/BatchSQLTest.java:
##
@@ -34,66 +35,105 @@
 import org.apache.flink.table.sinks.CsvTableSink;
 import org.apache.flink.table.sources.InputFormatTableSource;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.test.resources.ResourceTestUtils;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.ZoneOffset;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.UUID;
 
-/**
- * End-to-end test for batch SQL queries.
- *
- * The sources are generated and bounded. The result is always constant.
- *
- * Parameters: -outputPath output file path for CsvTableSink; -sqlStatement 
SQL statement that
- * will be executed as executeSql
- */
-public class BatchSQLTestProgram {
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** End-to-End tests for Batch SQL tests. */
+@ExtendWith(TestLoggerExtension.class)
+public class BatchSQLTest {
+private static final Logger LOG = 
LoggerFactory.getLogger(BatchSQLTest.class);
+
+private static final Path sqlPath =
+ResourceTestUtils.getResource("resources/sql-job-query.sql");
+
+@TempDir private Path tmp;
+
+@RegisterExtension
+private static final MiniClusterExtension MINI_CLUSTER =
+new MiniClusterExtension(
+new MiniClusterResourceConfiguration.Builder()
+.setNumberTaskManagers(2)
+.setNumberSlotsPerTaskManager(1)
+.build());
+
+private Path result;
+
+@BeforeEach
+public void before() {
+this.result = tmp.resolve(String.format("result-%s", 
UUID.randomUUID()));
+LOG.info("Results for this test will be stored at: {}", this.result);
+}
+
+@ParameterizedTest
+@EnumSource(
+value = BatchShuffleMode.class,
+names = {
+"ALL_EXCHANGES_BLOCKING",

Review Comment:
   Not an expert either, but I tried and I get an `IllegalState`:
   
   
   > At the moment, adaptive batch scheduler requires batch workloads to be 
executed with types of all edges being BLOCKING or 
HYBRID_FULL/HYBRID_SELECTIVE. To do that, you need to configure 
'execution.batch-shuffle-mode' to 'ALL_EXCHANGES_BLOCKING' or 
'ALL_EXCHANGES_HYBRID_FULL/ALL_EXCHANGES_HYBRID_SELECTIVE'. Note that for 
DataSet jobs which do not recognize the aforementioned shuffle mode, the 
ExecutionMode needs to be BATCH_FORCED to force BLOCKING shuffle



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]

2024-03-27 Thread via GitHub


XComp commented on code in PR #24471:
URL: https://github.com/apache/flink/pull/24471#discussion_r1541160078


##
flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/BatchSQLTest.java:
##
@@ -34,66 +35,105 @@
 import org.apache.flink.table.sinks.CsvTableSink;
 import org.apache.flink.table.sources.InputFormatTableSource;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.test.resources.ResourceTestUtils;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.ZoneOffset;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.UUID;
 
-/**
- * End-to-end test for batch SQL queries.
- *
- * The sources are generated and bounded. The result is always constant.
- *
- * Parameters: -outputPath output file path for CsvTableSink; -sqlStatement 
SQL statement that
- * will be executed as executeSql
- */
-public class BatchSQLTestProgram {
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** End-to-End tests for Batch SQL tests. */
+@ExtendWith(TestLoggerExtension.class)
+public class BatchSQLTest {

Review Comment:
   ```suggestion
   class BatchSQLTest {
   ```
   Junit5 allows for test classes to be package-protected. This will enable you 
to remove the JavaDoc and still comply to checkstyle. The JavaDoc itself 
doesn't add much value.



##
flink-end-to-end-tests/flink-batch-sql-test/src/test/resources/log4j2-test.properties:
##
@@ -0,0 +1,31 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level=OFF
+rootLogger.appenderRef.test.ref=TestLogger
+appender.testlogger.name=TestLogger
+appender.testlogger.type=CONSOLE
+appender.testlogger.target=SYSTEM_ERR
+appender.testlogger.layout.type=PatternLayout
+appender.testlogger.layout.pattern=%-4r [%t] %-5p %c %x - %m%n
+# Uncomment to enable codegen logging
+#loggers = testlogger
+#logger.testlogger.name =org.apache.flink.table.planner.codegen
+#logger.testlogger.level = TRACE
+#logger.testlogger.appenderRefs = TestLogger

Review Comment:
   ```suggestion
   
   # Set root logger level to OFF to not flood build logs
   # set manually to INFO for debugging purposes
   rootLogger.level=OFF
   rootLogger.appenderRef.test.ref=TestLogger
   
   appender.testlogger.name=TestLogger
   appender.testlogger.type=CONSOLE
   appender.testlogger.target=SYSTEM_ERR
   appender.testlogger.layout.type=PatternLayout
   appender.testlogger.layout.pattern=%-4r [%t] %-5p %c %x - %m%n
   
   # Uncomment to enable codegen logging
   #loggers = testlogger
   #logger.testlogger.name =org.apache.flink.table.planner.codegen
   #logger.testlogger.level = TRACE
   #logger.testlogger.appenderRefs = TestLogger
   ```
   Then let's add some empty lines in between



##
flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/BatchSQLTest.java:
##
@@ -34,66 +35,105 @@
 import org.apache.flink.table.sinks.CsvTableSink;
 import org.apache.flink.table.sources.InputFormatTableSource;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.test.resources.ResourceTestUtils;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;

Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]

2024-03-25 Thread via GitHub


affo commented on PR #24471:
URL: https://github.com/apache/flink/pull/24471#issuecomment-2018234458

   @XComp everything should be ok now  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]

2024-03-19 Thread via GitHub


affo commented on PR #24471:
URL: https://github.com/apache/flink/pull/24471#issuecomment-2007495060

   @XComp 
   
   Required quite of an effort honestly, but here we are with the JUnit5 
version of what I had before  
   
   This also allowed not to start a separate jar, but to directly include the 
code in the text and directly run it agains the MiniCluster obtained  
   
   Thank you for your detailed review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]

2024-03-19 Thread via GitHub


affo commented on code in PR #24471:
URL: https://github.com/apache/flink/pull/24471#discussion_r1530613302


##
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java:
##
@@ -234,10 +234,7 @@ public JobID submitJob(final JobSubmission jobSubmission, 
Duration timeout) thro
 
 LOG.info("Running {}.", commands.stream().collect(Collectors.joining(" 
")));
 
-final Pattern pattern =
-jobSubmission.isDetached()
-? Pattern.compile("Job has been submitted with JobID 
(.*)")
-: Pattern.compile("Job with JobID (.*) has finished.");
+final Pattern pattern = Pattern.compile("Job has been submitted with 
JobID (.*)");

Review Comment:
   It used to be, as apparently that string matching is now obsolete.
   With latest changes this is not relevant anymore  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]

2024-03-19 Thread via GitHub


affo commented on code in PR #24471:
URL: https://github.com/apache/flink/pull/24471#discussion_r1530612428


##
flink-end-to-end-tests/flink-batch-sql-test/src/test/resources/log4j2-test.properties:
##
@@ -0,0 +1,31 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level=OFF
+rootLogger.appenderRef.test.ref=TestLogger
+appender.testlogger.name=TestLogger
+appender.testlogger.type=CONSOLE
+appender.testlogger.target=SYSTEM_ERR
+appender.testlogger.layout.type=PatternLayout
+appender.testlogger.layout.pattern=%-4r [%t] %-5p %c %x - %m%n
+# Uncomment to enable codegen logging
+#loggers = testlogger
+#logger.testlogger.name =org.apache.flink.table.planner.codegen
+#logger.testlogger.level = TRACE
+#logger.testlogger.appenderRefs = TestLogger

Review Comment:
   This is quite useful for running tests (switch from `OFF` to `INFO`) and 
`OFF` prevents to clutter logs in CI runs



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]

2024-03-18 Thread via GitHub


affo commented on PR #24471:
URL: https://github.com/apache/flink/pull/24471#issuecomment-2003878871

   @XComp thank you for your review, gonna address the feedback today (as I had 
a week off)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]

2024-03-14 Thread via GitHub


XComp commented on code in PR #24471:
URL: https://github.com/apache/flink/pull/24471#discussion_r1525110715


##
flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/BatchSQLTest.java:
##
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.tests;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.test.resources.ResourceTestUtils;
+import org.apache.flink.test.util.JobSubmission;
+import org.apache.flink.tests.util.flink.ClusterController;
+import org.apache.flink.tests.util.flink.FlinkResource;
+import org.apache.flink.tests.util.flink.FlinkResourceSetup;
+import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResourceFactory;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.UUID;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;

Review Comment:
   Your mixing up JUnit5 and JUnit4 classes/methods. The community agreed to 
move forward with JUnit5/Assertj for new tests. It's therefore advised to stick 
to this dependencies for newly added tests.



##
flink-end-to-end-tests/flink-batch-sql-test/src/test/java/org/apache/flink/sql/tests/BatchSQLTest.java:
##
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.tests;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.test.resources.ResourceTestUtils;
+import org.apache.flink.test.util.JobSubmission;
+import org.apache.flink.tests.util.flink.ClusterController;
+import org.apache.flink.tests.util.flink.FlinkResource;
+import org.apache.flink.tests.util.flink.FlinkResourceSetup;
+import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResourceFactory;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.UUID;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** End-to-End tests for Batch SQL tests. */
+@RunWith(Parameterized.class)
+public class BatchSQLTest extends TestLogger {
+private static final Logger LOG = 
LoggerFactory.getLogger(BatchSQLTest.class);
+private static final Path testJar = 
ResourceTestUtils.getResource("/BatchSQLTestProgram.jar");
+private static final String testMainClass = 
"org.apache.flink.sql.tests.BatchSQLTestProgram";
+private static final Path sqlPath =
+ResourceTestUtils.getResource("resources/sql-job-query.sql");
+
+@Parameterized.Parameters(name = "shuffleType[{0}]")
+public static Collection shuffleTypes() {
+return Arrays.asList("blocking", "hybrid_full", "hybrid_selective");
+}
+
+@Rule public final 

Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]

2024-03-11 Thread via GitHub


affo commented on PR #24471:
URL: https://github.com/apache/flink/pull/24471#issuecomment-1988639429

   @wuchong @XComp 
   
   Hi guys, here is the PR for 
https://issues.apache.org/jira/browse/FLINK-20398.
   
   I decided to go for `LocalStandaloneFlinkResourceFactory` as it is already 
used and part of `flink-end-to-end-tests-common` other options were:
- `MiniCluster`
- `FlinkContainers` (testcontainers-based)
   Just tell me if you would rather see one of those implementations for some 
reason  
   
   Important concern:
   This test used to be part of `run-nightly-tests.sh`,
   now I think it would run differently 樂 
   
   Should it still run nightly?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]

2024-03-08 Thread via GitHub


flinkbot commented on PR #24471:
URL: https://github.com/apache/flink/pull/24471#issuecomment-1985868527

   
   ## CI report:
   
   * 35acd12f6acdd8c132f3dbc365c5c24549b7bc18 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework [flink]

2024-03-08 Thread via GitHub


affo opened a new pull request, #24471:
URL: https://github.com/apache/flink/pull/24471

   ## What is the purpose of the change
   
   Migrate `test_batch_sql.sh` to end-to-end test frameworks.
   
   
   ## Brief change log
   
- implement `BatchSQLTest` porting `test_batch_sql.sh`
- fix issue in getting job ID in `FlinkDistribution`
- remove `test_batch_sql.sh` script
- remove `test_batch_sql.sh` invocations from `run-nightly-tests.sh`
   
   Question: should `run-nightly-tests.sh` now run this Java version?
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
 - Added integration tests for end-to-end batch mode SQL query execution
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? NA
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org