This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e89818  Revert "[FLINK-19850] Add e2e tests for the new FileSink in 
streaming mode"
3e89818 is described below

commit 3e89818414728a4c90b7335d2fd2566477b9274a
Author: Robert Metzger <[email protected]>
AuthorDate: Fri Nov 6 18:36:01 2020 +0100

    Revert "[FLINK-19850] Add e2e tests for the new FileSink in streaming mode"
    
    This reverts commit dfd2a55065e228b973f9e2343b6252ca308e5398.
---
 .../pom.xml                                        | 16 +++-----
 .../src/main/java/StreamingFileSinkProgram.java}   | 46 +++++++---------------
 flink-end-to-end-tests/pom.xml                     |  2 +-
 flink-end-to-end-tests/run-nightly-tests.sh        |  7 +---
 ...st_file_sink.sh => test_streaming_file_sink.sh} | 12 +++---
 5 files changed, 28 insertions(+), 55 deletions(-)

diff --git a/flink-end-to-end-tests/flink-file-sink-test/pom.xml 
b/flink-end-to-end-tests/flink-streaming-file-sink-test/pom.xml
similarity index 83%
rename from flink-end-to-end-tests/flink-file-sink-test/pom.xml
rename to flink-end-to-end-tests/flink-streaming-file-sink-test/pom.xml
index d14a5f8..6d309e2 100644
--- a/flink-end-to-end-tests/flink-file-sink-test/pom.xml
+++ b/flink-end-to-end-tests/flink-streaming-file-sink-test/pom.xml
@@ -27,8 +27,8 @@ under the License.
 
        <modelVersion>4.0.0</modelVersion>
 
-       <artifactId>flink-file-sink-test</artifactId>
-       <name>Flink : E2E Tests : File sink</name>
+       <artifactId>flink-streaming-file-sink-test</artifactId>
+       <name>Flink : E2E Tests : Streaming file sink</name>
 
        <dependencies>
                <dependency>
@@ -37,12 +37,6 @@ under the License.
                        <version>${project.version}</version>
                        <scope>provided</scope>
                </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-connector-files</artifactId>
-                       <version>${project.version}</version>
-               </dependency>
        </dependencies>
 
        <build>
@@ -52,16 +46,16 @@ under the License.
                                <artifactId>maven-shade-plugin</artifactId>
                                <executions>
                                        <execution>
-                                               <id>FileSinkTestProgram</id>
+                                               
<id>StreamingFileSinkSinkTestProgram</id>
                                                <phase>package</phase>
                                                <goals>
                                                        <goal>shade</goal>
                                                </goals>
                                                <configuration>
-                                                       
<finalName>FileSinkProgram</finalName>
+                                                       
<finalName>StreamingFileSinkProgram</finalName>
                                                        <transformers>
                                                                <transformer 
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
-                                                                       
<mainClass>FileSinkProgram</mainClass>
+                                                                       
<mainClass>StreamingFileSinkProgram</mainClass>
                                                                </transformer>
                                                        </transformers>
                                                </configuration>
diff --git 
a/flink-end-to-end-tests/flink-file-sink-test/src/main/java/FileSinkProgram.java
 
b/flink-end-to-end-tests/flink-streaming-file-sink-test/src/main/java/StreamingFileSinkProgram.java
similarity index 79%
rename from 
flink-end-to-end-tests/flink-file-sink-test/src/main/java/FileSinkProgram.java
rename to 
flink-end-to-end-tests/flink-streaming-file-sink-test/src/main/java/StreamingFileSinkProgram.java
index 62b0edc..c9cf6d7 100644
--- 
a/flink-end-to-end-tests/flink-file-sink-test/src/main/java/FileSinkProgram.java
+++ 
b/flink-end-to-end-tests/flink-streaming-file-sink-test/src/main/java/StreamingFileSinkProgram.java
@@ -24,13 +24,11 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.connector.file.sink.FileSink;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
 import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
@@ -42,7 +40,7 @@ import java.io.PrintStream;
 import java.util.concurrent.TimeUnit;
 
 /**
- * Test program for the {@link StreamingFileSink} and {@link FileSink}.
+ * Test program for the {@link StreamingFileSink}.
  *
  * <p>Uses a source that steadily emits a deterministic set of records over 60 
seconds,
  * after which it idles and waits for job cancellation. Every record has a 
unique index that is
@@ -52,13 +50,12 @@ import java.util.concurrent.TimeUnit;
  * Adding all committed part files together, and numerically sorting the 
contents, should
  * result in a complete sequence from 0 (inclusive) to 60000 (exclusive).
  */
-public enum FileSinkProgram {
+public enum StreamingFileSinkProgram {
        ;
 
        public static void main(final String[] args) throws Exception {
                final ParameterTool params = ParameterTool.fromArgs(args);
                final String outputPath = params.getRequired("outputPath");
-               final String sinkToTest = params.getRequired("sinkToTest");
 
                final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
@@ -66,37 +63,24 @@ public enum FileSinkProgram {
                env.enableCheckpointing(5000L);
                
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
Time.of(10L, TimeUnit.SECONDS)));
 
+               final StreamingFileSink<Tuple2<Integer, Integer>> sink = 
StreamingFileSink
+                       .forRowFormat(new Path(outputPath), 
(Encoder<Tuple2<Integer, Integer>>) (element, stream) -> {
+                               PrintStream out = new PrintStream(stream);
+                               out.println(element.f1);
+                       })
+                       .withBucketAssigner(new KeyBucketAssigner())
+                       .withRollingPolicy(OnCheckpointRollingPolicy.build())
+                       .build();
+
                // generate data, shuffle, sink
-               DataStream<Tuple2<Integer, Integer>> source = env.addSource(new 
Generator(10, 10, 60));
-
-               if (sinkToTest.equalsIgnoreCase("StreamingFileSink")) {
-                       final StreamingFileSink<Tuple2<Integer, Integer>> sink 
= StreamingFileSink
-                                       .forRowFormat(new Path(outputPath), 
(Encoder<Tuple2<Integer, Integer>>) (element, stream) -> {
-                                               PrintStream out = new 
PrintStream(stream);
-                                               out.println(element.f1);
-                                       })
-                                       .withBucketAssigner(new 
KeyBucketAssigner())
-                                       
.withRollingPolicy(OnCheckpointRollingPolicy.build())
-                                       .build();
-
-                       source.keyBy(0).addSink(sink);
-               } else if (sinkToTest.equalsIgnoreCase("FileSink")){
-                       FileSink<Tuple2<Integer, Integer>> sink = FileSink
-                                       .forRowFormat(new Path(outputPath), 
(Encoder<Tuple2<Integer, Integer>>) (element, stream) -> {
-                                               PrintStream out = new 
PrintStream(stream);
-                                               out.println(element.f1);
-                                       })
-                                       .withBucketAssigner(new 
KeyBucketAssigner())
-                                       
.withRollingPolicy(OnCheckpointRollingPolicy.build())
-                                       .build();
-                       source.keyBy(0).sinkTo(sink);
-               } else {
-                       throw new UnsupportedOperationException("Unsupported 
sink type: " + sinkToTest);
-               }
+               env.addSource(new Generator(10, 10, 60))
+                       .keyBy(0)
+                       .addSink(sink);
 
                env.execute("StreamingFileSinkProgram");
        }
 
+
        /**
         * Use first field for buckets.
         */
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index 2b6e1be..1eb78e5 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -68,7 +68,7 @@ under the License.
                <module>flink-confluent-schema-registry</module>
                <module>flink-stream-state-ttl-test</module>
                <module>flink-sql-client-test</module>
-               <module>flink-file-sink-test</module>
+               <module>flink-streaming-file-sink-test</module>
                <module>flink-state-evolution-test</module>
                <module>flink-rocksdb-state-memory-control-test</module>
                <module>flink-end-to-end-tests-common</module>
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh 
b/flink-end-to-end-tests/run-nightly-tests.sh
index d3a318a..fa4e656 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -185,11 +185,8 @@ run_test "Batch SQL end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_batch_sq
 run_test "Streaming SQL end-to-end test (Old planner)" 
"$END_TO_END_DIR/test-scripts/test_streaming_sql.sh old" "skip_check_exceptions"
 run_test "Streaming SQL end-to-end test (Blink planner)" 
"$END_TO_END_DIR/test-scripts/test_streaming_sql.sh blink" 
"skip_check_exceptions"
 
-run_test "Streaming File Sink end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_file_sink.sh local 
StreamingFileSink" "skip_check_exceptions"
-run_test "Streaming File Sink s3 end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_file_sink.sh s3 StreamingFileSink" 
"skip_check_exceptions"
-run_test "New File Sink end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_file_sink.sh local FileSink" 
"skip_check_exceptions"
-run_test "New File Sink s3 end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_file_sink.sh s3 FileSink" 
"skip_check_exceptions"
-
+run_test "Streaming File Sink end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_file_sink.sh" 
"skip_check_exceptions"
+run_test "Streaming File Sink s3 end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_file_sink.sh s3" 
"skip_check_exceptions"
 run_test "Stateful stream job upgrade end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_stateful_stream_job_upgrade.sh 2 4"
 
 run_test "Netty shuffle direct memory consumption end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_netty_shuffle_memory_control.sh"
diff --git a/flink-end-to-end-tests/test-scripts/test_file_sink.sh 
b/flink-end-to-end-tests/test-scripts/test_streaming_file_sink.sh
similarity index 94%
rename from flink-end-to-end-tests/test-scripts/test_file_sink.sh
rename to flink-end-to-end-tests/test-scripts/test_streaming_file_sink.sh
index 5965c07..0880f62 100755
--- a/flink-end-to-end-tests/test-scripts/test_file_sink.sh
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_file_sink.sh
@@ -18,9 +18,8 @@
 
################################################################################
 
 OUT_TYPE="${1:-local}" # other type: s3
-SINK_TO_TEST="${2:-"StreamingFileSink"}"
 
-S3_PREFIX=temp/test_file_sink-$(uuidgen)
+S3_PREFIX=temp/test_streaming_file_sink-$(uuidgen)
 OUTPUT_PATH="$TEST_DATA_DIR/$S3_PREFIX"
 S3_OUTPUT_PATH="s3://$IT_CASE_S3_BUCKET/$S3_PREFIX"
 source "$(dirname "$0")"/common.sh
@@ -61,7 +60,7 @@ if [ "${OUT_TYPE}" == "s3" ]; then
   on_exit out_cleanup
 fi
 
-TEST_PROGRAM_JAR="${END_TO_END_DIR}/flink-file-sink-test/target/FileSinkProgram.jar"
+TEST_PROGRAM_JAR="${END_TO_END_DIR}/flink-streaming-file-sink-test/target/StreamingFileSinkProgram.jar"
 
 ###################################
 # Get all lines in part files and sort them numerically.
@@ -136,7 +135,7 @@ function wait_for_complete_result {
     done
 }
 
-function run_file_sink_test {
+function run_streaming_file_sink_test {
   start_cluster
 
   "${FLINK_DIR}/bin/taskmanager.sh" start
@@ -144,8 +143,7 @@ function run_file_sink_test {
   "${FLINK_DIR}/bin/taskmanager.sh" start
 
   echo "Submitting job."
-  CLIENT_OUTPUT=$("$FLINK_DIR/bin/flink" run -d "${TEST_PROGRAM_JAR}" 
--outputPath "${JOB_OUTPUT_PATH}" \
-    --sinkToTest "${SINK_TO_TEST}")
+  CLIENT_OUTPUT=$("$FLINK_DIR/bin/flink" run -d "${TEST_PROGRAM_JAR}" 
--outputPath "${JOB_OUTPUT_PATH}")
   JOB_ID=$(echo "${CLIENT_OUTPUT}" | grep "Job has been submitted with JobID" 
| sed 's/.* //g')
 
   if [[ -z $JOB_ID ]]; then
@@ -189,4 +187,4 @@ function run_file_sink_test {
 }
 
 # usual runtime is ~6 minutes
-run_test_with_timeout 900 run_file_sink_test
+run_test_with_timeout 900 run_streaming_file_sink_test

Reply via email to