JingsongLi commented on a change in pull request #12657:
URL: https://github.com/apache/flink/pull/12657#discussion_r440626499



##########
File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosableProcess.java
##########
@@ -167,6 +186,18 @@ private static void processStream(final InputStream 
stream, final Consumer<Strin
                ).start();
        }
 
+       private static void processOutputStream(final OutputStream stream, 
final List<String> inputs) {

Review comment:
       inputs -> inputLines?

##########
File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosableProcess.java
##########
@@ -167,6 +186,18 @@ private static void processStream(final InputStream 
stream, final Consumer<Strin
                ).start();
        }
 
+       private static void processOutputStream(final OutputStream stream, 
final List<String> inputs) {

Review comment:
       `processOutputStream(inputs)` looks confuse to me. Can we just name it 
`printLinesToProcess`?

##########
File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java
##########
@@ -179,80 +174,55 @@ public void testKafka() throws Exception {
                }
        }
 
-       private void insertIntoAvroTable(ClusterController clusterController) 
throws IOException {
-               LOG.info("Executing SQL: Kafka {} JSON -> Kafka {} Avro", 
kafkaSQLVersion, kafkaSQLVersion);
-               String sqlStatement1 = "INSERT INTO AvroBothTable\n" +
-                               "  SELECT\n" +
-                               "    CAST(TUMBLE_START(rowtime, INTERVAL '1' 
HOUR) AS VARCHAR) AS event_timestamp,\n" +
-                               "    user,\n" +
-                               "    RegReplace(event.message, ' is ', ' was ') 
AS message,\n" +
-                               "    COUNT(*) AS duplicate_count\n" +
-                               "  FROM JsonSourceTable\n" +
-                               "  WHERE user IS NOT NULL\n" +
-                               "  GROUP BY\n" +
-                               "    user,\n" +
-                               "    event.message,\n" +
-                               "    TUMBLE(rowtime, INTERVAL '1' HOUR)";
-
-               clusterController.submitSQLJob(new 
SQLJobSubmission.SQLJobSubmissionBuilder(sqlStatement1)
-                               .addJar(sqlAvroJar)
-                               .addJars(apacheAvroJars)
-                               .addJar(sqlJsonJar)
-                               .addJar(sqlConnectorKafkaJar)
-                               .addJar(sqlToolBoxJar)
-                               
.setSessionEnvFile(this.sqlClientSessionConf.toAbsolutePath().toString())
-                               .build());
-       }
-
-       private void insertIntoCsvSinkTable(ClusterController 
clusterController) throws IOException {
-               LOG.info("Executing SQL: Kafka {} Avro -> Csv sink", 
kafkaSQLVersion);
-               String sqlStatement2 = "INSERT INTO CsvSinkTable\n" +
-                               "   SELECT AvroBothTable.*, RegReplace('Test 
constant folding.', 'Test', 'Success') AS constant\n" +
-                               "   FROM AvroBothTable";
-
-               clusterController.submitSQLJob(new 
SQLJobSubmission.SQLJobSubmissionBuilder(sqlStatement2)
-                               .addJar(sqlAvroJar)
-                               .addJars(apacheAvroJars)
-                               .addJar(sqlJsonJar)
-                               .addJar(sqlConnectorKafkaJar)
-                               .addJar(sqlToolBoxJar)
-                               
.setSessionEnvFile(this.sqlClientSessionConf.toAbsolutePath().toString())
-                               .build()
-               );
+       private void executeSqlStatements(ClusterController clusterController, 
List<String> sqlLines) throws IOException {
+               LOG.info("Executing Kafka {} end-to-end SQL statements.", 
kafkaSQLVersion);
+               clusterController.submitSQLJob(new 
SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines)
+                       .addJar(sqlAvroJar)
+                       .addJars(apacheAvroJars)
+                       .addJar(sqlConnectorKafkaJar)
+                       .addJar(sqlToolBoxJar)
+                       .build());
        }
 
-       private String initializeSessionYaml(Map<String, String> vars) throws 
IOException {
-               URL url = 
SQLClientKafkaITCase.class.getClassLoader().getResource(KAFKA_JSON_SOURCE_SCHEMA_YAML);
+       private List<String> initializeSqlLines(Map<String, String> vars) 
throws IOException {
+               URL url = 
SQLClientKafkaITCase.class.getClassLoader().getResource(KAFKA_E2E_SQL);
                if (url == null) {
-                       throw new 
FileNotFoundException(KAFKA_JSON_SOURCE_SCHEMA_YAML);
+                       throw new FileNotFoundException(KAFKA_E2E_SQL);
                }
 
-               String schema = FileUtils.readFileUtf8(new File(url.getFile()));
-               for (Map.Entry<String, String> var : vars.entrySet()) {
-                       schema = schema.replace(var.getKey(), var.getValue());
+               List<String> lines = Files.readAllLines(new 
File(url.getFile()).toPath());
+               List<String> result = new ArrayList<>();
+               for (String line : lines) {
+                       for (Map.Entry<String, String> var : vars.entrySet()) {
+                               line = line.replace(var.getKey(), 
var.getValue());
+                       }
+                       result.add(line);
                }
-               return schema;
+
+               return result;
        }
 
        private void checkCsvResultFile() throws Exception {
                boolean success = false;
                final Deadline deadline = 
Deadline.fromNow(Duration.ofSeconds(120));
-               while (!success && deadline.hasTimeLeft()) {
+               while (deadline.hasTimeLeft()) {
                        if (Files.exists(result)) {
-                               byte[] bytes = Files.readAllBytes(result);
-                               String[] lines = new String(bytes, 
Charsets.UTF_8).split("\n");
-                               if (lines.length == 4) {
+                               List<String> lines = readCsvResultFiles(result);
+                               if (lines.size() == 4) {
                                        success = true;
                                        assertThat(
-                                               lines,
+                                               lines.toArray(new String[0]),
                                                arrayContainingInAnyOrder(
-                                                       "2018-03-12 
08:00:00.000,Alice,This was a warning.,2,Success constant folding.",
-                                                       "2018-03-12 
09:00:00.000,Bob,This was another warning.,1,Success constant folding.",
-                                                       "2018-03-12 
09:00:00.000,Steve,This was another info.,2,Success constant folding.",
-                                                       "2018-03-12 
09:00:00.000,Alice,This was a info.,1,Success constant folding."
+                                                       "\"2018-03-12 
08:00:00.000\",Alice,\"This was a warning.\",2,\"Success constant folding.\"",

Review comment:
       Add `disable-quote-character` to remove `"`?

##########
File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosableProcess.java
##########
@@ -89,6 +95,11 @@ public AutoClosableProcessBuilder setStderrProcessor(final 
Consumer<String> stde
                        return this;
                }
 
+               public AutoClosableProcessBuilder setStdInputs(final 
List<String> inputLines) {

Review comment:
       inputLines can be `String...`

##########
File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosableProcess.java
##########
@@ -167,6 +186,18 @@ private static void processStream(final InputStream 
stream, final Consumer<Strin
                ).start();
        }
 
+       private static void processOutputStream(final OutputStream stream, 
final List<String> inputs) {
+               new Thread(() -> {

Review comment:
       I don't think we need start a thread.
   The inputs are something like commands, should print them blocking.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to