[core] Removed redundant common.io.FormatUtil
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3d5d63f7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3d5d63f7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3d5d63f7 Branch: refs/heads/master Commit: 3d5d63f7f80f3dfb353a86b10cfbed653b24421f Parents: 61b1c0a Author: Stephan Ewen <[email protected]> Authored: Sun Jul 12 19:42:49 2015 +0200 Committer: Stephan Ewen <[email protected]> Committed: Mon Jul 13 15:14:41 2015 +0200 ---------------------------------------------------------------------- .../apache/flink/api/common/io/FormatUtil.java | 186 ------------------- .../api/common/io/SerializedFormatTest.java | 21 ++- 2 files changed, 15 insertions(+), 192 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3d5d63f7/flink-core/src/test/java/org/apache/flink/api/common/io/FormatUtil.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/FormatUtil.java b/flink-core/src/test/java/org/apache/flink/api/common/io/FormatUtil.java deleted file mode 100644 index 83860a5..0000000 --- a/flink-core/src/test/java/org/apache/flink/api/common/io/FormatUtil.java +++ /dev/null @@ -1,186 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.common.io; - -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.fs.BlockLocation; -import org.apache.flink.core.fs.FileInputSplit; -import org.apache.flink.core.fs.FileStatus; -import org.apache.flink.core.fs.FileSystem; -import org.apache.flink.core.fs.Path; -import org.apache.flink.core.fs.FileSystem.WriteMode; -import org.apache.flink.core.io.InputSplit; -import org.apache.flink.util.ReflectionUtil; - -/** - * Provides convenience methods to deal with I/O operations related to {@link InputFormat} and {@link OutputFormat}. - */ -public class FormatUtil { - - - /** - * Creates an {@link InputFormat} from a given class for the specified file. The optional {@link Configuration} - * initializes the format. - * - * @param <T> - * the class of the InputFormat - * @param inputFormatClass - * the class of the InputFormat - * @param path - * the path of the file - * @param configuration - * optional configuration of the InputFormat - * @return the created {@link InputFormat} - * @throws IOException - * if an I/O error occurred while accessing the file or initializing the InputFormat. - */ - public static <T, F extends FileInputFormat<T>> F openInput( - Class<F> inputFormatClass, String path, Configuration configuration) - throws IOException - { - configuration = configuration == null ? new Configuration() : configuration; - - Path normalizedPath = normalizePath(new Path(path)); - final F inputFormat = ReflectionUtil.newInstance(inputFormatClass); - - inputFormat.setFilePath(normalizedPath); - inputFormat.setOpenTimeout(0); - inputFormat.configure(configuration); - - final FileSystem fs = FileSystem.get(normalizedPath.toUri()); - FileStatus fileStatus = fs.getFileStatus(normalizedPath); - - BlockLocation[] blocks = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen()); - inputFormat.open(new FileInputSplit(0, new Path(path), 0, fileStatus.getLen(), blocks[0].getHosts())); - return inputFormat; - } - - /** - * Creates {@link InputFormat}s from a given class for the specified file(s). The optional {@link Configuration} - * initializes the formats. - * - * @param <T> - * the class of the InputFormat - * @param inputFormatClass - * the class of the InputFormat - * @param path - * the path of the file or to the directory containing the splits - * @param configuration - * optional configuration of the InputFormat - * @return the created {@link InputFormat}s for each file in the specified path - * @throws IOException - * if an I/O error occurred while accessing the files or initializing the InputFormat. - */ - @SuppressWarnings("unchecked") - public static <T, F extends FileInputFormat<T>> List<F> openAllInputs( - Class<F> inputFormatClass, String path, Configuration configuration) throws IOException { - Path nephelePath = new Path(path); - FileSystem fs = nephelePath.getFileSystem(); - FileStatus fileStatus = fs.getFileStatus(nephelePath); - if (!fileStatus.isDir()) { - return Arrays.asList(openInput(inputFormatClass, path, configuration)); - } - FileStatus[] list = fs.listStatus(nephelePath); - List<F> formats = new ArrayList<F>(); - for (int index = 0; index < list.length; index++) { - formats.add(openInput(inputFormatClass, list[index].getPath().toString(), configuration)); - } - return formats; - } - - /** - * Creates an {@link InputFormat} from a given class. The optional {@link Configuration} - * initializes the format. - * - * @param <T> - * the class of the InputFormat - * @param inputFormatClass - * the class of the InputFormat - * @param configuration - * optional configuration of the InputFormat - * @return the created {@link InputFormat} - * @throws IOException - * if an I/O error occurred while accessing the file or initializing the InputFormat. - */ - public static <T, IS extends InputSplit, F extends InputFormat<T, IS>> F openInput( - Class<F> inputFormatClass, Configuration configuration) throws IOException { - configuration = configuration == null ? new Configuration() : configuration; - - final F inputFormat = ReflectionUtil.newInstance(inputFormatClass); - inputFormat.configure(configuration); - final IS[] splits = inputFormat.createInputSplits(1); - inputFormat.open(splits[0]); - return inputFormat; - } - - /** - * Creates an {@link OutputFormat} from a given class for the specified file. The optional {@link Configuration} - * initializes the format. - * - * @param <T> - * the class of the OutputFormat - * @param outputFormatClass - * the class of the OutputFormat - * @param path - * the path of the file or to the directory containing the splits - * @param configuration - * optional configuration of the OutputFormat - * @return the created {@link OutputFormat} - * @throws IOException - * if an I/O error occurred while accessing the file or initializing the OutputFormat. - */ - public static <T, F extends FileOutputFormat<? extends T>> F openOutput( - Class<F> outputFormatClass, String path, Configuration configuration) - throws IOException - { - final F outputFormat = ReflectionUtil.newInstance(outputFormatClass); - outputFormat.setOutputFilePath(new Path(path)); - outputFormat.setWriteMode(WriteMode.OVERWRITE); - - configuration = configuration == null ? new Configuration() : configuration; - - outputFormat.configure(configuration); - outputFormat.open(0, 1); - return outputFormat; - } - - /** - * Fixes the path if it denotes a local (relative) file without the proper protocol prefix. - */ - private static Path normalizePath(Path path) { - URI uri = path.toUri(); - if (uri.getScheme() == null) { - try { - uri = new URI("file", uri.getHost(), uri.getPath(), uri.getFragment()); - path = new Path(uri.toString()); - } catch (URISyntaxException e) { - throw new IllegalArgumentException("path is invalid", e); - } - } - return path; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/3d5d63f7/flink-core/src/test/java/org/apache/flink/api/common/io/SerializedFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/SerializedFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/SerializedFormatTest.java index e421f4f..d82623b 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/io/SerializedFormatTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/io/SerializedFormatTest.java @@ -19,10 +19,13 @@ package org.apache.flink.api.common.io; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.types.IntValue; import org.apache.flink.types.Record; import org.apache.flink.types.StringValue; + import org.junit.Assert; import org.junit.Before; import org.junit.runner.RunWith; @@ -56,13 +59,19 @@ public class SerializedFormatTest extends SequentialFormatTestBase<Record> { return inputFormat; } - - @SuppressWarnings({ "unchecked", "rawtypes" }) + @Override - protected BinaryOutputFormat<Record> createOutputFormat(String path, Configuration - configuration) throws IOException { - return FormatUtil.<Record, SerializedOutputFormat>openOutput - (SerializedOutputFormat.class, path, configuration); + protected BinaryOutputFormat<Record> createOutputFormat(String path, Configuration configuration) + throws IOException + { + final SerializedOutputFormat<Record> outputFormat = new SerializedOutputFormat<Record>(); + outputFormat.setOutputFilePath(new Path(path)); + outputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE); + + configuration = configuration == null ? new Configuration() : configuration; + outputFormat.configure(configuration); + outputFormat.open(0, 1); + return outputFormat; } @Override
