JingsongLi commented on a change in pull request #12657: URL: https://github.com/apache/flink/pull/12657#discussion_r440626499
########## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosableProcess.java ########## @@ -167,6 +186,18 @@ private static void processStream(final InputStream stream, final Consumer<Strin ).start(); } + private static void processOutputStream(final OutputStream stream, final List<String> inputs) { Review comment: inputs -> inputLines? ########## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosableProcess.java ########## @@ -167,6 +186,18 @@ private static void processStream(final InputStream stream, final Consumer<Strin ).start(); } + private static void processOutputStream(final OutputStream stream, final List<String> inputs) { Review comment: `processOutputStream(inputs)` looks confuse to me. Can we just name it `printLinesToProcess`? ########## File path: flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java ########## @@ -179,80 +174,55 @@ public void testKafka() throws Exception { } } - private void insertIntoAvroTable(ClusterController clusterController) throws IOException { - LOG.info("Executing SQL: Kafka {} JSON -> Kafka {} Avro", kafkaSQLVersion, kafkaSQLVersion); - String sqlStatement1 = "INSERT INTO AvroBothTable\n" + - " SELECT\n" + - " CAST(TUMBLE_START(rowtime, INTERVAL '1' HOUR) AS VARCHAR) AS event_timestamp,\n" + - " user,\n" + - " RegReplace(event.message, ' is ', ' was ') AS message,\n" + - " COUNT(*) AS duplicate_count\n" + - " FROM JsonSourceTable\n" + - " WHERE user IS NOT NULL\n" + - " GROUP BY\n" + - " user,\n" + - " event.message,\n" + - " TUMBLE(rowtime, INTERVAL '1' HOUR)"; - - clusterController.submitSQLJob(new SQLJobSubmission.SQLJobSubmissionBuilder(sqlStatement1) - .addJar(sqlAvroJar) - .addJars(apacheAvroJars) - .addJar(sqlJsonJar) - .addJar(sqlConnectorKafkaJar) - .addJar(sqlToolBoxJar) - .setSessionEnvFile(this.sqlClientSessionConf.toAbsolutePath().toString()) - .build()); - } - - private void insertIntoCsvSinkTable(ClusterController clusterController) throws IOException { - LOG.info("Executing SQL: Kafka {} Avro -> Csv sink", kafkaSQLVersion); - String sqlStatement2 = "INSERT INTO CsvSinkTable\n" + - " SELECT AvroBothTable.*, RegReplace('Test constant folding.', 'Test', 'Success') AS constant\n" + - " FROM AvroBothTable"; - - clusterController.submitSQLJob(new SQLJobSubmission.SQLJobSubmissionBuilder(sqlStatement2) - .addJar(sqlAvroJar) - .addJars(apacheAvroJars) - .addJar(sqlJsonJar) - .addJar(sqlConnectorKafkaJar) - .addJar(sqlToolBoxJar) - .setSessionEnvFile(this.sqlClientSessionConf.toAbsolutePath().toString()) - .build() - ); + private void executeSqlStatements(ClusterController clusterController, List<String> sqlLines) throws IOException { + LOG.info("Executing Kafka {} end-to-end SQL statements.", kafkaSQLVersion); + clusterController.submitSQLJob(new SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines) + .addJar(sqlAvroJar) + .addJars(apacheAvroJars) + .addJar(sqlConnectorKafkaJar) + .addJar(sqlToolBoxJar) + .build()); } - private String initializeSessionYaml(Map<String, String> vars) throws IOException { - URL url = SQLClientKafkaITCase.class.getClassLoader().getResource(KAFKA_JSON_SOURCE_SCHEMA_YAML); + private List<String> initializeSqlLines(Map<String, String> vars) throws IOException { + URL url = SQLClientKafkaITCase.class.getClassLoader().getResource(KAFKA_E2E_SQL); if (url == null) { - throw new FileNotFoundException(KAFKA_JSON_SOURCE_SCHEMA_YAML); + throw new FileNotFoundException(KAFKA_E2E_SQL); } - String schema = FileUtils.readFileUtf8(new File(url.getFile())); - for (Map.Entry<String, String> var : vars.entrySet()) { - schema = schema.replace(var.getKey(), var.getValue()); + List<String> lines = Files.readAllLines(new File(url.getFile()).toPath()); + List<String> result = new ArrayList<>(); + for (String line : lines) { + for (Map.Entry<String, String> var : vars.entrySet()) { + line = line.replace(var.getKey(), var.getValue()); + } + result.add(line); } - return schema; + + return result; } private void checkCsvResultFile() throws Exception { boolean success = false; final Deadline deadline = Deadline.fromNow(Duration.ofSeconds(120)); - while (!success && deadline.hasTimeLeft()) { + while (deadline.hasTimeLeft()) { if (Files.exists(result)) { - byte[] bytes = Files.readAllBytes(result); - String[] lines = new String(bytes, Charsets.UTF_8).split("\n"); - if (lines.length == 4) { + List<String> lines = readCsvResultFiles(result); + if (lines.size() == 4) { success = true; assertThat( - lines, + lines.toArray(new String[0]), arrayContainingInAnyOrder( - "2018-03-12 08:00:00.000,Alice,This was a warning.,2,Success constant folding.", - "2018-03-12 09:00:00.000,Bob,This was another warning.,1,Success constant folding.", - "2018-03-12 09:00:00.000,Steve,This was another info.,2,Success constant folding.", - "2018-03-12 09:00:00.000,Alice,This was a info.,1,Success constant folding." + "\"2018-03-12 08:00:00.000\",Alice,\"This was a warning.\",2,\"Success constant folding.\"", Review comment: Add `disable-quote-character` to remove `"`? ########## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosableProcess.java ########## @@ -89,6 +95,11 @@ public AutoClosableProcessBuilder setStderrProcessor(final Consumer<String> stde return this; } + public AutoClosableProcessBuilder setStdInputs(final List<String> inputLines) { Review comment: inputLines can be `String...` ########## File path: flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosableProcess.java ########## @@ -167,6 +186,18 @@ private static void processStream(final InputStream stream, final Consumer<Strin ).start(); } + private static void processOutputStream(final OutputStream stream, final List<String> inputs) { + new Thread(() -> { Review comment: I don't think we need start a thread. The inputs are something like commands, should print them blocking. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org