[ https://issues.apache.org/jira/browse/FLINK-8268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16308137#comment-16308137 ]
ASF GitHub Bot commented on FLINK-8268: --------------------------------------- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5193#discussion_r159220879 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/ContentDump.java --- @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Utility class to simulate in memory file like writes, flushes and closing. + */ +public class ContentDump { + private boolean writable = true; + private Map<String, List<String>> filesContent = new HashMap<>(); + + public Set<String> listFiles() { + return filesContent.keySet(); + } + + public void setWritable(boolean writable) { + this.writable = writable; + } + + /** + * Creates an empty file. + */ + public ContentWriter createWriter(String name) { + checkArgument(!filesContent.containsKey(name), "File [%s] already exists", name); + filesContent.put(name, new ArrayList<>()); + return new ContentWriter(name, this); + } + + public static void move(String name, ContentDump source, ContentDump target) { + Collection<String> content = source.read(name); + try (ContentWriter contentWriter = target.createWriter(name)) { + contentWriter.write(content).flush(); + } + source.delete(name); + } + + public void delete(String name) { + filesContent.remove(name); + } + + public Collection<String> read(String name) { + List<String> content = filesContent.get(name); + checkState(content != null, "Unknown file [%s]", name); + List<String> result = new ArrayList<>(content); + content.clear(); --- End diff -- Hmm, I was thinking more in a manner of reading from a file descriptor, where subsequent reads return only new data. But now when I think about it, that would be more expected behaviour for some `ContentReader` class and dropping `content.clear()` here makes more sense. > Test instability for 'TwoPhaseCommitSinkFunctionTest' > ----------------------------------------------------- > > Key: FLINK-8268 > URL: https://issues.apache.org/jira/browse/FLINK-8268 > Project: Flink > Issue Type: Bug > Components: Tests > Affects Versions: 1.5.0 > Reporter: Stephan Ewen > Assignee: Piotr Nowojski > Priority: Critical > Labels: test-stability > > The following exception / failure message occurs. > {code} > Tests run: 5, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 1.824 sec <<< > FAILURE! - in > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest > testIgnoreCommitExceptionDuringRecovery(org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest) > Time elapsed: 0.068 sec <<< ERROR! > java.lang.Exception: Could not complete snapshot 0 for operator MockTask > (1/1). > at java.io.FileOutputStream.writeBytes(Native Method) > at java.io.FileOutputStream.write(FileOutputStream.java:326) > at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221) > at sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:291) > at sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:295) > at sun.nio.cs.StreamEncoder.flush(StreamEncoder.java:141) > at java.io.OutputStreamWriter.flush(OutputStreamWriter.java:229) > at java.io.BufferedWriter.flush(BufferedWriter.java:254) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest$FileBasedSinkFunction.preCommit(TwoPhaseCommitSinkFunctionTest.java:313) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest$FileBasedSinkFunction.preCommit(TwoPhaseCommitSinkFunctionTest.java:288) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:290) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357) > at > org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:459) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest.testIgnoreCommitExceptionDuringRecovery(TwoPhaseCommitSinkFunctionTest.java:208) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)