[ https://issues.apache.org/jira/browse/FLINK-2480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14695410#comment-14695410 ]
ASF GitHub Bot commented on FLINK-2480: --------------------------------------- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/991#discussion_r36990759 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java --- @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.accumulators.AccumulatorRegistry; +import org.apache.flink.runtime.broadcast.BroadcastVariableManager; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; +import org.apache.flink.runtime.memorymanager.MemoryManager; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; +import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; + +import static org.junit.Assert.*; + +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Future; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.PrintSinkFunction}. + */ +public class PrintSinkFunctionTest<IN> extends RichSinkFunction<IN> { + + private Environment envForPrefixNull = new Environment() { + @Override + public JobID getJobID() { + return null; + } + + @Override + public JobVertexID getJobVertexId() { + return null; + } + + @Override + public ExecutionAttemptID getExecutionId() { + return null; + } + + @Override + public Configuration getTaskConfiguration() { + return null; + } + + @Override + public TaskManagerRuntimeInfo getTaskManagerInfo() { + return null; + } + + @Override + public Configuration getJobConfiguration() { + return null; + } + + @Override + public int getNumberOfSubtasks() { + return 0; + } + + @Override + public int getIndexInSubtaskGroup() { + return 0; + } + + @Override + public InputSplitProvider getInputSplitProvider() { + return null; + } + + @Override + public IOManager getIOManager() { + return null; + } + + @Override + public MemoryManager getMemoryManager() { + return null; + } + + @Override + public String getTaskName() { + return null; + } + + @Override + public String getTaskNameWithSubtasks() { + return null; + } + + @Override + public ClassLoader getUserClassLoader() { + return null; + } + + @Override + public Map<String, Future<Path>> getDistributedCacheEntries() { + return null; + } + + @Override + public BroadcastVariableManager getBroadcastVariableManager() { + return null; + } + + @Override + public AccumulatorRegistry getAccumulatorRegistry() { + return null; + } + + @Override + public void acknowledgeCheckpoint(long checkpointId) { + + } + + @Override + public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state) { + + } + + @Override + public ResultPartitionWriter getWriter(int index) { + return null; + } + + @Override + public ResultPartitionWriter[] getAllWriters() { + return new ResultPartitionWriter[0]; + } + + @Override + public InputGate getInputGate(int index) { + return null; + } + + @Override + public InputGate[] getAllInputGates() { + return new InputGate[0]; + } + }; + + @Test + public void testPrintSinkStdOut(){ + + ExecutionConfig executionConfig = new ExecutionConfig(); + final HashMap<String, Accumulator<?, ?>> accumulators = new HashMap<String, Accumulator<?, ?>>(); + final StreamingRuntimeContext ctx = new StreamingRuntimeContext( + "Test Print Sink", + this.envForPrefixNull, + null, + executionConfig, + null, + null, + accumulators + ); + + PrintSinkFunction<String> printSink = new PrintSinkFunction<String>(); + printSink.setRuntimeContext(ctx); + try { + printSink.open(new Configuration()); + } catch (Exception e) { + e.printStackTrace(); + } + printSink.setTargetToStandardOut(); + printSink.invoke("hello world!"); + + assertEquals("Print to System.out", printSink.toString()); --- End diff -- You're not testing whether the sink actually prints to stdout. You're just checking if the `toString()` methods returns the correct mode. > Improving tests coverage for org.apache.flink.streaming.api > ----------------------------------------------------------- > > Key: FLINK-2480 > URL: https://issues.apache.org/jira/browse/FLINK-2480 > Project: Flink > Issue Type: Test > Components: Streaming > Affects Versions: 0.10 > Reporter: Huang Wei > Fix For: 0.10 > > Original Estimate: 504h > Remaining Estimate: 504h > > The streaming API is quite a bit newer than the other code so it is not that > well covered with tests. -- This message was sent by Atlassian JIRA (v6.3.4#6332)