PARQUET-777: Add Parquet CLI. This adds a new parquet-cli module with an improved command-line tool. The parquet-cli/README.md file has instructions for building and testing locally.
Author: Ryan Blue <b...@apache.org> Author: Tom White <t...@cloudera.com> Closes #384 from rdblue/PARQUET-777-add-parquet-cli and squashes the following commits: de49eff [Ryan Blue] PARQUET-777: Move dynamic support classes, add tests. affdfb9 [Ryan Blue] PARQUET-777: Update for review feedback. f953fd4 [Ryan Blue] PARQUET-777: Update README.md with better instructions. aed223d [Tom White] Replace source file headers with Apache header. d718363 [Ryan Blue] PARQUET-777: Add Parquet CLI. Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/ddbeb4dd Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/ddbeb4dd Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/ddbeb4dd Branch: refs/heads/master Commit: ddbeb4dd17d9c219b99b1e66d8be28efe37e3aa6 Parents: df9f8d8 Author: Ryan Blue <b...@apache.org> Authored: Fri Jul 28 16:25:21 2017 -0700 Committer: Ryan Blue <b...@apache.org> Committed: Fri Jul 28 16:25:21 2017 -0700 ---------------------------------------------------------------------- NOTICE | 38 ++ parquet-cli/README.md | 107 ++++ parquet-cli/pom.xml | 153 +++++ .../org/apache/parquet/cli/BaseCommand.java | 397 ++++++++++++ .../java/org/apache/parquet/cli/Command.java | 40 ++ .../cli/HadoopFileSystemURLStreamHandler.java | 79 +++ .../main/java/org/apache/parquet/cli/Help.java | 147 +++++ .../main/java/org/apache/parquet/cli/Main.java | 178 ++++++ .../main/java/org/apache/parquet/cli/Util.java | 335 ++++++++++ .../parquet/cli/commands/CSVSchemaCommand.java | 131 ++++ .../apache/parquet/cli/commands/CatCommand.java | 106 ++++ .../cli/commands/CheckParquet251Command.java | 351 ++++++++++ .../parquet/cli/commands/ConvertCSVCommand.java | 204 ++++++ .../parquet/cli/commands/ConvertCommand.java | 165 +++++ .../cli/commands/ParquetMetadataCommand.java | 180 ++++++ .../parquet/cli/commands/SchemaCommand.java | 138 ++++ .../cli/commands/ShowDictionaryCommand.java | 131 ++++ .../parquet/cli/commands/ShowPagesCommand.java | 217 +++++++ .../parquet/cli/commands/ToAvroCommand.java | 141 ++++ .../org/apache/parquet/cli/csv/AvroCSV.java | 258 ++++++++ .../apache/parquet/cli/csv/AvroCSVReader.java | 121 ++++ .../apache/parquet/cli/csv/CSVProperties.java | 111 ++++ .../apache/parquet/cli/csv/RecordBuilder.java | 200 ++++++ .../org/apache/parquet/cli/json/AvroJson.java | 636 +++++++++++++++++++ .../apache/parquet/cli/json/AvroJsonReader.java | 85 +++ .../org/apache/parquet/cli/util/Codecs.java | 50 ++ .../apache/parquet/cli/util/Expressions.java | 391 ++++++++++++ .../org/apache/parquet/cli/util/Formats.java | 47 ++ .../apache/parquet/cli/util/GetClassLoader.java | 39 ++ .../parquet/cli/util/RecordException.java | 53 ++ .../parquet/cli/util/RuntimeIOException.java | 31 + .../org/apache/parquet/cli/util/Schemas.java | 498 +++++++++++++++ .../cli/util/SeekableFSDataInputStream.java | 76 +++ parquet-cli/src/main/resources/META-INF/LICENSE | 348 ++++++++++ parquet-cli/src/main/resources/META-INF/NOTICE | 45 ++ .../src/main/resources/cli-logging.properties | 51 ++ .../java/org/apache/parquet/Exceptions.java | 34 + .../apache/parquet/util/DynConstructors.java | 273 ++++++++ .../org/apache/parquet/util/DynMethods.java | 520 +++++++++++++++ .../test/java/org/apache/parquet/TestUtils.java | 70 ++ .../org/apache/parquet/util/Concatenator.java | 82 +++ .../parquet/util/TestDynConstructors.java | 235 +++++++ .../org/apache/parquet/util/TestDynMethods.java | 410 ++++++++++++ pom.xml | 7 + 44 files changed, 7909 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/NOTICE ---------------------------------------------------------------------- diff --git a/NOTICE b/NOTICE index a9b6c56..289b092 100644 --- a/NOTICE +++ b/NOTICE @@ -54,3 +54,41 @@ its NOTICE file: This product includes software developed at The Apache Software Foundation (http://www.apache.org/). +-------------------------------------------------------------------------------- + +This project includes code from Kite, developed at Cloudera, Inc. with +the following copyright notice: + +| Copyright 2013 Cloudera Inc. +| +| Licensed under the Apache License, Version 2.0 (the "License"); +| you may not use this file except in compliance with the License. +| You may obtain a copy of the License at +| +| http://www.apache.org/licenses/LICENSE-2.0 +| +| Unless required by applicable law or agreed to in writing, software +| distributed under the License is distributed on an "AS IS" BASIS, +| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +| See the License for the specific language governing permissions and +| limitations under the License. + +-------------------------------------------------------------------------------- + +This project includes code from Netflix, Inc. with the following copyright +notice: + +| Copyright 2016 Netflix, Inc. +| +| Licensed under the Apache License, Version 2.0 (the "License"); +| you may not use this file except in compliance with the License. +| You may obtain a copy of the License at +| +| http://www.apache.org/licenses/LICENSE-2.0 +| +| Unless required by applicable law or agreed to in writing, software +| distributed under the License is distributed on an "AS IS" BASIS, +| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +| See the License for the specific language governing permissions and +| limitations under the License. + http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/parquet-cli/README.md ---------------------------------------------------------------------- diff --git a/parquet-cli/README.md b/parquet-cli/README.md new file mode 100644 index 0000000..d17d719 --- /dev/null +++ b/parquet-cli/README.md @@ -0,0 +1,107 @@ +<!-- + - Licensed to the Apache Software Foundation (ASF) under one + - or more contributor license agreements. See the NOTICE file + - distributed with this work for additional information + - regarding copyright ownership. The ASF licenses this file + - to you under the Apache License, Version 2.0 (the + - "License"); you may not use this file except in compliance + - with the License. You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, + - software distributed under the License is distributed on an + - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + - KIND, either express or implied. See the License for the + - specific language governing permissions and limitations + - under the License. + --> + +## Building + +You can build this project using maven: + +``` +mvn clean install -DskipTests +``` + + +## Running + +The build produces a shaded Jar that can be run using the `hadoop` command: + +``` +hadoop jar parquet-cli-1.9.1-runtime.jar org.apache.parquet.cli.Main +``` + +For a shorter command-line invocation, add an alias to your shell like this: + +``` +alias parquet="hadoop jar /path/to/parquet-cli-1.9.1-runtime.jar org.apache.parquet.cli.Main --dollar-zero parquet" +``` + +### Running without Hadoop + +To run from the target directory instead of using the `hadoop` command, first copy the dependencies to a folder: + +``` +mvn dependency:copy-dependencies +``` + +Then, run the command-line and add `target/dependencies/*` to the classpath: + +``` +java -cp 'target/*:target/dependency/*' org.apache.parquet.cli.Main +``` + + +### Help + +The `parquet` tool includes help for the included commands: + +``` +parquet help +``` +``` +Usage: parquet [options] [command] [command options] + + Options: + + -v, --verbose, --debug + Print extra debugging information + + Commands: + + help + Retrieves details on the functions of other commands + meta + Print a Parquet file's metadata + pages + Print page summaries for a Parquet file + dictionary + Print dictionaries for a Parquet column + check-stats + Check Parquet files for corrupt page and column stats (PARQUET-251) + schema + Print the Avro schema for a file + csv-schema + Build a schema from a CSV data sample + convert-csv + Create a file from CSV data + convert + Create a Parquet file from a data file + to-avro + Create an Avro file from a data file + cat + Print the first N records from a file + head + Print the first N records from a file + + Examples: + + # print information for create + parquet help create + + See 'parquet help <command>' for more information on a specific command. +``` + http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/parquet-cli/pom.xml ---------------------------------------------------------------------- diff --git a/parquet-cli/pom.xml b/parquet-cli/pom.xml new file mode 100644 index 0000000..a9cd21b --- /dev/null +++ b/parquet-cli/pom.xml @@ -0,0 +1,153 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet</artifactId> + <relativePath>../pom.xml</relativePath> + <version>1.9.1-SNAPSHOT</version> + </parent> + + <modelVersion>4.0.0</modelVersion> + + <artifactId>parquet-cli</artifactId> + <packaging>jar</packaging> + + <name>Apache Parquet Command-line</name> + <url>https://parquet.apache.org</url> + + <dependencies> + <dependency> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-avro</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + <version>${avro.version}</version> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>${slf4j.version}</version> + </dependency> + <dependency> + <groupId>net.sf.opencsv</groupId> + <artifactId>opencsv</artifactId> + <version>${opencsv.version}</version> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <version>${jackson2.version}</version> + </dependency> + <dependency> + <groupId>com.beust</groupId> + <artifactId>jcommander</artifactId> + <version>${jcommander.version}</version> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <version>${slf4j.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>${guava.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>commons-codec</groupId> + <artifactId>commons-codec</artifactId> + <version>${commons-codec.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + <version>${hadoop.version}</version> + <scope>provided</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <!-- This module disables semver checks because it is not a public API. + <plugin> + <artifactId>maven-enforcer-plugin</artifactId> + </plugin> + --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <shadedArtifactAttached>true</shadedArtifactAttached> + <shadedClassifierName>runtime</shadedClassifierName> + <minimizeJar>false</minimizeJar> + <filters> + <filter> + <artifact>org.xerial.snappy:*</artifact> + <excludes> + <exclude>**/LICENSE</exclude> + </excludes> + </filter> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/LICENSE.txt</exclude> + <exclude>META-INF/NOTICE.txt</exclude> + </excludes> + </filter> + </filters> + <artifactSet> + <includes> + <include>*</include> + </includes> + </artifactSet> + <relocations> + <relocation> + <!-- relocate Avro in the runtime jar to avoid conflicts with + on-cluster Avro versions. + --> + <pattern>org.apache.avro</pattern> + <shadedPattern>${shade.prefix}.org.apache.avro</shadedPattern> + </relocation> + </relocations> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java ---------------------------------------------------------------------- diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java new file mode 100644 index 0000000..4b47164 --- /dev/null +++ b/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java @@ -0,0 +1,397 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.cli; + +import com.beust.jcommander.internal.Lists; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.io.CharStreams; +import com.google.common.io.Resources; +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.file.SeekableInput; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ChecksumFileSystem; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.avro.AvroParquetReader; +import org.apache.parquet.avro.AvroReadSupport; +import org.apache.parquet.cli.json.AvroJsonReader; +import org.apache.parquet.cli.util.Formats; +import org.apache.parquet.cli.util.GetClassLoader; +import org.apache.parquet.cli.util.Schemas; +import org.apache.parquet.cli.util.SeekableFSDataInputStream; +import org.apache.parquet.hadoop.ParquetReader; +import org.slf4j.Logger; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URL; +import java.nio.charset.Charset; +import java.security.AccessController; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +public abstract class BaseCommand implements Command, Configurable { + + @VisibleForTesting + static final Charset UTF8 = Charset.forName("utf8"); + + private static final String RESOURCE_URI_SCHEME = "resource"; + private static final String STDIN_AS_SOURCE = "stdin"; + + protected final Logger console; + + private Configuration conf = null; + private LocalFileSystem localFS = null; + + public BaseCommand(Logger console) { + this.console = console; + } + + /** + * @return FileSystem to use when no file system scheme is present in a path + * @throws IOException + */ + public FileSystem defaultFS() throws IOException { + if (localFS == null) { + this.localFS = FileSystem.getLocal(getConf()); + } + return localFS; + } + + /** + * Output content to the console or a file. + * + * This will not produce checksum files. + * + * @param content String content to write + * @param console A {@link Logger} for writing to the console + * @param filename The destination {@link Path} as a String + * @throws IOException + */ + public void output(String content, Logger console, String filename) + throws IOException { + if (filename == null || "-".equals(filename)) { + console.info(content); + } else { + FSDataOutputStream outgoing = create(filename); + try { + outgoing.write(content.getBytes(UTF8)); + } finally { + outgoing.close(); + } + } + } + + /** + * Creates a file and returns an open {@link FSDataOutputStream}. + * + * If the file does not have a file system scheme, this uses the default FS. + * + * This will not produce checksum files and will overwrite a file that + * already exists. + * + * @param filename The filename to create + * @return An open FSDataOutputStream + * @throws IOException + */ + public FSDataOutputStream create(String filename) throws IOException { + return create(filename, true); + } + + /** + * Creates a file and returns an open {@link FSDataOutputStream}. + * + * If the file does not have a file system scheme, this uses the default FS. + * + * This will produce checksum files and will overwrite a file that already + * exists. + * + * @param filename The filename to create + * @return An open FSDataOutputStream + * @throws IOException + */ + public FSDataOutputStream createWithChecksum(String filename) + throws IOException { + return create(filename, false); + } + + private FSDataOutputStream create(String filename, boolean noChecksum) + throws IOException { + Path filePath = qualifiedPath(filename); + // even though it was qualified using the default FS, it may not be in it + FileSystem fs = filePath.getFileSystem(getConf()); + if (noChecksum && fs instanceof ChecksumFileSystem) { + fs = ((ChecksumFileSystem) fs).getRawFileSystem(); + } + return fs.create(filePath, true /* overwrite */); + } + + /** + * Returns a qualified {@link Path} for the {@code filename}. + * + * If the file does not have a file system scheme, this uses the default FS. + * + * @param filename The filename to qualify + * @return A qualified Path for the filename + * @throws IOException + */ + public Path qualifiedPath(String filename) throws IOException { + Path cwd = defaultFS().makeQualified(new Path(".")); + return new Path(filename).makeQualified(defaultFS().getUri(), cwd); + } + + /** + * Returns a {@link URI} for the {@code filename} that is a qualified Path or + * a resource URI. + * + * If the file does not have a file system scheme, this uses the default FS. + * + * @param filename The filename to qualify + * @return A qualified URI for the filename + * @throws IOException + */ + public URI qualifiedURI(String filename) throws IOException { + URI fileURI = URI.create(filename); + if (RESOURCE_URI_SCHEME.equals(fileURI.getScheme())) { + return fileURI; + } else { + return qualifiedPath(filename).toUri(); + } + } + + /** + * Opens an existing file or resource. + * + * If the file does not have a file system scheme, this uses the default FS. + * + * @param filename The filename to open. + * @return An open InputStream with the file contents + * @throws IOException + * @throws IllegalArgumentException If the file does not exist + */ + public InputStream open(String filename) throws IOException { + if (STDIN_AS_SOURCE.equals(filename)) { + return System.in; + } + + URI uri = qualifiedURI(filename); + if (RESOURCE_URI_SCHEME.equals(uri.getScheme())) { + return Resources.getResource(uri.getRawSchemeSpecificPart()).openStream(); + } else { + Path filePath = new Path(uri); + // even though it was qualified using the default FS, it may not be in it + FileSystem fs = filePath.getFileSystem(getConf()); + return fs.open(filePath); + } + } + + public SeekableInput openSeekable(String filename) throws IOException { + Path path = qualifiedPath(filename); + // even though it was qualified using the default FS, it may not be in it + FileSystem fs = path.getFileSystem(getConf()); + return new SeekableFSDataInputStream(fs, path); + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + HadoopFileSystemURLStreamHandler.setDefaultConf(conf); + } + + @Override + public Configuration getConf() { + return conf; + } + + /** + * Returns a {@link ClassLoader} for a set of jars and directories. + * + * @param jars A list of jar paths + * @param paths A list of directories containing .class files + * @throws MalformedURLException + */ + protected static ClassLoader loaderFor(List<String> jars, List<String> paths) + throws MalformedURLException { + return AccessController.doPrivileged(new GetClassLoader(urls(jars, paths))); + } + + /** + * Returns a {@link ClassLoader} for a set of jars. + * + * @param jars A list of jar paths + * @throws MalformedURLException + */ + protected static ClassLoader loaderForJars(List<String> jars) + throws MalformedURLException { + return AccessController.doPrivileged(new GetClassLoader(urls(jars, null))); + } + + /** + * Returns a {@link ClassLoader} for a set of directories. + * + * @param paths A list of directories containing .class files + * @throws MalformedURLException + */ + protected static ClassLoader loaderForPaths(List<String> paths) + throws MalformedURLException { + return AccessController.doPrivileged(new GetClassLoader(urls(null, paths))); + } + + private static List<URL> urls(List<String> jars, List<String> dirs) + throws MalformedURLException { + // check the additional jars and lib directories in the local FS + final List<URL> urls = Lists.newArrayList(); + if (dirs != null) { + for (String lib : dirs) { + // final URLs must end in '/' for URLClassLoader + File path = lib.endsWith("/") ? new File(lib) : new File(lib + "/"); + Preconditions.checkArgument(path.exists(), + "Lib directory does not exist: " + lib); + Preconditions.checkArgument(path.isDirectory(), + "Not a directory: " + lib); + Preconditions.checkArgument(path.canRead() && path.canExecute(), + "Insufficient permissions to access lib directory: " + lib); + urls.add(path.toURI().toURL()); + } + } + if (jars != null) { + for (String jar : jars) { + File path = new File(jar); + Preconditions.checkArgument(path.exists(), + "Jar files does not exist: " + jar); + Preconditions.checkArgument(path.isFile(), + "Not a file: " + jar); + Preconditions.checkArgument(path.canRead(), + "Cannot read jar file: " + jar); + urls.add(path.toURI().toURL()); + } + } + return urls; + } + + protected <D> Iterable<D> openDataFile(final String source, Schema projection) + throws IOException { + Formats.Format format = Formats.detectFormat(open(source)); + switch (format) { + case PARQUET: + Configuration conf = new Configuration(getConf()); + // TODO: add these to the reader builder + AvroReadSupport.setRequestedProjection(conf, projection); + AvroReadSupport.setAvroReadSchema(conf, projection); + final ParquetReader<D> parquet = AvroParquetReader.<D>builder(qualifiedPath(source)) + .disableCompatibility() + .withDataModel(GenericData.get()) + .withConf(conf) + .build(); + return new Iterable<D>() { + @Override + public Iterator<D> iterator() { + return new Iterator<D>() { + private boolean hasNext = false; + private D next = advance(); + + @Override + public boolean hasNext() { + return hasNext; + } + + @Override + public D next() { + if (!hasNext) { + throw new NoSuchElementException(); + } + D toReturn = next; + this.next = advance(); + return toReturn; + } + + private D advance() { + try { + D next = parquet.read(); + this.hasNext = (next != null); + return next; + } catch (IOException e) { + throw new RuntimeException( + "Failed while reading Parquet file: " + source, e); + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Remove is not supported"); + } + }; + } + }; + + case AVRO: + Iterable<D> avroReader = (Iterable<D>) DataFileReader.openReader( + openSeekable(source), new GenericDatumReader<>(projection)); + return avroReader; + + default: + if (source.endsWith("json")) { + return new AvroJsonReader<>(open(source), projection); + } else { + Preconditions.checkArgument(projection == null, + "Cannot select columns from text files"); + Iterable text = CharStreams.readLines(new InputStreamReader(open(source))); + return text; + } + } + } + + protected Schema getAvroSchema(String source) throws IOException { + Formats.Format format; + try (SeekableInput in = openSeekable(source)) { + format = Formats.detectFormat((InputStream) in); + in.seek(0); + + switch (format) { + case PARQUET: + return Schemas.fromParquet(getConf(), qualifiedURI(source)); + case AVRO: + return Schemas.fromAvro(open(source)); + case TEXT: + if (source.endsWith("avsc")) { + return Schemas.fromAvsc(open(source)); + } else if (source.endsWith("json")) { + return Schemas.fromJSON("json", open(source)); + } + default: + } + + throw new IllegalArgumentException(String.format( + "Could not determine file format of %s.", source)); + } + } + +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/parquet-cli/src/main/java/org/apache/parquet/cli/Command.java ---------------------------------------------------------------------- diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/Command.java b/parquet-cli/src/main/java/org/apache/parquet/cli/Command.java new file mode 100644 index 0000000..9c19143 --- /dev/null +++ b/parquet-cli/src/main/java/org/apache/parquet/cli/Command.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.cli; + +import java.io.IOException; +import java.util.List; + +public interface Command { + /** + * Runs this {@code Command}. + * + * @return a return code for the process, 0 indicates success. + * @throws IOException + */ + int run() throws IOException; + + /** + * Returns a list of example uses. Lines starting with '#' will not have the + * executable name added when formatting. + * + * @return a list of String examples + */ + List<String> getExamples(); +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/parquet-cli/src/main/java/org/apache/parquet/cli/HadoopFileSystemURLStreamHandler.java ---------------------------------------------------------------------- diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/HadoopFileSystemURLStreamHandler.java b/parquet-cli/src/main/java/org/apache/parquet/cli/HadoopFileSystemURLStreamHandler.java new file mode 100644 index 0000000..548544a --- /dev/null +++ b/parquet-cli/src/main/java/org/apache/parquet/cli/HadoopFileSystemURLStreamHandler.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.cli; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.net.URLConnection; +import java.net.URLStreamHandler; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +/** + * A {@link URLStreamHandler} for handling Hadoop filesystem URLs, + * most commonly those with the <i>hdfs</i> scheme. + */ +public class HadoopFileSystemURLStreamHandler extends URLStreamHandler + implements Configurable { + + private static Configuration defaultConf = new Configuration(); + + public static Configuration getDefaultConf() { + return defaultConf; + } + + public static void setDefaultConf(Configuration defaultConf) { + HadoopFileSystemURLStreamHandler.defaultConf = defaultConf; + } + + private Configuration conf = defaultConf; + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + protected URLConnection openConnection(URL url) throws IOException { + return new HadoopFileSystemURLConnection(url); + } + + class HadoopFileSystemURLConnection extends URLConnection { + public HadoopFileSystemURLConnection(URL url) { + super(url); + } + @Override + public void connect() throws IOException { + } + @Override + public InputStream getInputStream() throws IOException { + Path path = new Path(url.toExternalForm()); + FileSystem fileSystem = path.getFileSystem(conf); + return fileSystem.open(path); + } + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/parquet-cli/src/main/java/org/apache/parquet/cli/Help.java ---------------------------------------------------------------------- diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/Help.java b/parquet-cli/src/main/java/org/apache/parquet/cli/Help.java new file mode 100644 index 0000000..791d169 --- /dev/null +++ b/parquet-cli/src/main/java/org/apache/parquet/cli/Help.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.cli; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import com.beust.jcommander.ParameterDescription; +import com.beust.jcommander.Parameters; +import com.google.common.collect.Lists; +import org.slf4j.Logger; +import java.util.List; + +@Parameters(commandDescription = "Retrieves details on the functions of other commands") +public class Help implements Command { + @Parameter(description = "<commands>") + List<String> helpCommands = Lists.newArrayList(); + + private final JCommander jc; + private final Logger console; + private String programName; + + public Help(JCommander jc, Logger console) { + this.jc = jc; + this.console = console; + } + + public void setProgramName(String programName) { + this.programName = programName; + } + + @Override + public int run() { + if (helpCommands.isEmpty()) { + printGenericHelp(); + + } else { + for (String cmd : helpCommands) { + JCommander commander = jc.getCommands().get(cmd); + if (commander == null) { + console.error("\nUnknown command: {}\n", cmd); + printGenericHelp(); + return 1; + } + + boolean hasRequired = false; + console.info("\nUsage: {} [general options] {} {} [command options]", + new Object[] { + programName, cmd, + commander.getMainParameterDescription()}); + console.info("\n Description:"); + console.info("\n {}", jc.getCommandDescription(cmd)); + if (!commander.getParameters().isEmpty()) { + console.info("\n Command options:\n"); + for (ParameterDescription param : commander.getParameters()) { + hasRequired = printOption(console, param) || hasRequired; + } + if (hasRequired) { + console.info("\n * = required"); + } + } + List<String> examples = ((Command) commander.getObjects().get(0)).getExamples(); + if (examples != null) { + console.info("\n Examples:"); + for (String example : examples) { + if (example.startsWith("#")) { + // comment + console.info("\n {}", example); + } else { + console.info(" {} {} {}", + new Object[] {programName, cmd, example}); + } + } + } + // add an extra newline in case there are more commands + console.info(""); + } + } + return 0; + } + + public void printGenericHelp() { + boolean hasRequired = false; + console.info( + "\nUsage: {} [options] [command] [command options]", + programName); + console.info("\n Options:\n"); + for (ParameterDescription param : jc.getParameters()) { + hasRequired = printOption(console, param) || hasRequired; + } + if (hasRequired) { + console.info("\n * = required"); + } + console.info("\n Commands:\n"); + for (String command : jc.getCommands().keySet()) { + console.info(" {}\n\t{}", + command, jc.getCommandDescription(command)); + } + console.info("\n Examples:"); + console.info("\n # print information for create\n {} help create", + programName); + console.info("\n See '{} help <command>' for more information on a " + + "specific command.", programName); + } + + private boolean printOption(Logger console, ParameterDescription param) { + boolean required = param.getParameter().required(); + if (!param.getParameter().hidden()) { + console.info(" {} {}\n\t{}{}", new Object[]{ + required ? "*" : " ", + param.getNames().trim(), + param.getDescription(), + formatDefault(param)}); + } + return required; + } + + private String formatDefault(ParameterDescription param) { + Object defaultValue = param.getDefault(); + if (defaultValue == null || param.getParameter().arity() < 1) { + return ""; + } + return " (default: " + ((defaultValue instanceof String) ? + "\"" + defaultValue + "\"" : + defaultValue.toString()) + ")"; + } + + @Override + public List<String> getExamples() { + return null; + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/parquet-cli/src/main/java/org/apache/parquet/cli/Main.java ---------------------------------------------------------------------- diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/Main.java b/parquet-cli/src/main/java/org/apache/parquet/cli/Main.java new file mode 100644 index 0000000..990193c --- /dev/null +++ b/parquet-cli/src/main/java/org/apache/parquet/cli/Main.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.cli; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.MissingCommandException; +import com.beust.jcommander.Parameter; +import com.beust.jcommander.ParameterException; +import com.beust.jcommander.Parameters; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableSet; +import org.apache.parquet.cli.commands.CSVSchemaCommand; +import org.apache.parquet.cli.commands.CatCommand; +import org.apache.parquet.cli.commands.CheckParquet251Command; +import org.apache.parquet.cli.commands.ConvertCSVCommand; +import org.apache.parquet.cli.commands.ConvertCommand; +import org.apache.parquet.cli.commands.ParquetMetadataCommand; +import org.apache.parquet.cli.commands.SchemaCommand; +import org.apache.parquet.cli.commands.ShowDictionaryCommand; +import org.apache.parquet.cli.commands.ShowPagesCommand; +import org.apache.parquet.cli.commands.ToAvroCommand; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.log4j.Level; +import org.apache.log4j.PropertyConfigurator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.Set; + +@Parameters(commandDescription = "Parquet file utils") +public class Main extends Configured implements Tool { + + @Parameter(names = {"-v", "--verbose", "--debug"}, + description = "Print extra debugging information") + private boolean debug = false; + + @VisibleForTesting + @Parameter(names="--dollar-zero", + description="A way for the runtime path to be passed in", hidden=true) + String programName = DEFAULT_PROGRAM_NAME; + + @VisibleForTesting + static final String DEFAULT_PROGRAM_NAME = "parquet"; + + private static Set<String> HELP_ARGS = ImmutableSet.of("-h", "-help", "--help", "help"); + + private final Logger console; + private final Help help; + + @VisibleForTesting + final JCommander jc; + + Main(Logger console) { + this.console = console; + this.jc = new JCommander(this); + this.help = new Help(jc, console); + jc.setProgramName(DEFAULT_PROGRAM_NAME); + jc.addCommand("help", help, "-h", "-help", "--help"); + jc.addCommand("meta", new ParquetMetadataCommand(console)); + jc.addCommand("pages", new ShowPagesCommand(console)); + jc.addCommand("dictionary", new ShowDictionaryCommand(console)); + jc.addCommand("check-stats", new CheckParquet251Command(console)); + jc.addCommand("schema", new SchemaCommand(console)); + jc.addCommand("csv-schema", new CSVSchemaCommand(console)); + jc.addCommand("convert-csv", new ConvertCSVCommand(console)); + jc.addCommand("convert", new ConvertCommand(console)); + jc.addCommand("to-avro", new ToAvroCommand(console)); + jc.addCommand("cat", new CatCommand(console, 0)); + jc.addCommand("head", new CatCommand(console, 10)); + } + + @Override + public int run(String[] args) throws Exception { + try { + jc.parse(args); + } catch (MissingCommandException e) { + console.error(e.getMessage()); + return 1; + } catch (ParameterException e) { + help.setProgramName(programName); + String cmd = jc.getParsedCommand(); + if (args.length == 1) { // i.e., just the command (missing required arguments) + help.helpCommands.add(cmd); + help.run(); + return 1; + } else { // check for variants like 'cmd --help' etc. + for (String arg : args) { + if (HELP_ARGS.contains(arg)) { + help.helpCommands.add(cmd); + help.run(); + return 0; + } + } + } + console.error(e.getMessage()); + return 1; + } + + help.setProgramName(programName); + + // configure log4j + if (debug) { + org.apache.log4j.Logger console = org.apache.log4j.Logger.getLogger(Main.class); + console.setLevel(Level.DEBUG); + } + + String parsed = jc.getParsedCommand(); + if (parsed == null) { + help.run(); + return 1; + } else if ("help".equals(parsed)) { + return help.run(); + } + + Command command = (Command) jc.getCommands().get(parsed).getObjects().get(0); + if (command == null) { + help.run(); + return 1; + } + + try { + if (command instanceof Configurable) { + ((Configurable) command).setConf(getConf()); + } + return command.run(); + } catch (IllegalArgumentException e) { + if (debug) { + console.error("Argument error", e); + } else { + console.error("Argument error: {}", e.getMessage()); + } + return 1; + } catch (IllegalStateException e) { + if (debug) { + console.error("State error", e); + } else { + console.error("State error: {}", e.getMessage()); + } + return 1; + } catch (Exception e) { + console.error("Unknown error", e); + return 1; + } + } + + public static void main(String[] args) throws Exception { + // reconfigure logging with the kite CLI configuration + PropertyConfigurator.configure( + Main.class.getResource("/cli-logging.properties")); + Logger console = LoggerFactory.getLogger(Main.class); + // use Log4j for any libraries using commons-logging + LogFactory.getFactory().setAttribute( + "org.apache.commons.logging.Log", + "org.apache.commons.logging.impl.Log4JLogger"); + int rc = ToolRunner.run(new Configuration(), new Main(console), args); + System.exit(rc); + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/parquet-cli/src/main/java/org/apache/parquet/cli/Util.java ---------------------------------------------------------------------- diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/Util.java b/parquet-cli/src/main/java/org/apache/parquet/cli/Util.java new file mode 100644 index 0000000..860a218 --- /dev/null +++ b/parquet-cli/src/main/java/org/apache/parquet/cli/Util.java @@ -0,0 +1,335 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.cli; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.collect.Iterables; +import org.apache.commons.codec.binary.Hex; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.Encoding; +import org.apache.parquet.column.EncodingStats; +import org.apache.parquet.column.statistics.BinaryStatistics; +import org.apache.parquet.column.statistics.BooleanStatistics; +import org.apache.parquet.column.statistics.DoubleStatistics; +import org.apache.parquet.column.statistics.FloatStatistics; +import org.apache.parquet.column.statistics.IntStatistics; +import org.apache.parquet.column.statistics.LongStatistics; +import org.apache.parquet.column.statistics.Statistics; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import java.nio.charset.StandardCharsets; +import java.util.Locale; +import java.util.Set; + +import static org.apache.parquet.column.Encoding.BIT_PACKED; +import static org.apache.parquet.column.Encoding.DELTA_BINARY_PACKED; +import static org.apache.parquet.column.Encoding.DELTA_BYTE_ARRAY; +import static org.apache.parquet.column.Encoding.PLAIN; +import static org.apache.parquet.column.Encoding.PLAIN_DICTIONARY; +import static org.apache.parquet.column.Encoding.RLE; +import static org.apache.parquet.column.Encoding.RLE_DICTIONARY; +import static org.apache.parquet.format.Encoding.DELTA_LENGTH_BYTE_ARRAY; + + +public class Util { + + private static final long KB = 1024; + private static final long MB = 1024 * KB; + private static final long GB = 1024 * MB; + private static final long TB = 1024 * GB; + + public static String humanReadable(float bytes) { + if (bytes > TB) { + return String.format("%.03f TB", bytes / TB); + } else if (bytes > GB) { + return String.format("%.03f GB", bytes / GB); + } else if (bytes > MB) { + return String.format("%.03f MB", bytes / MB); + } else if (bytes > KB) { + return String.format("%.03f kB", bytes / KB); + } else { + return String.format("%.02f B", bytes); + } + } + + public static String humanReadable(long bytes) { + if (bytes > TB) { + return String.format("%.03f TB", ((float) bytes) / TB); + } else if (bytes > GB) { + return String.format("%.03f GB", ((float) bytes) / GB); + } else if (bytes > MB) { + return String.format("%.03f MB", ((float) bytes) / MB); + } else if (bytes > KB) { + return String.format("%.03f kB", ((float) bytes) / KB); + } else { + return String.format("%d B", bytes); + } + } + + public static String minMaxAsString(Statistics stats, OriginalType annotation) { + if (stats == null) { + return "no stats"; + } + if (!stats.hasNonNullValue()) { + return ""; + } + // TODO: use original types when showing decimal, timestamp, etc. + if (stats instanceof BooleanStatistics) { + return String.format("%s / %s", + ((BooleanStatistics) stats).getMin(), + ((BooleanStatistics) stats).getMax()); + } else if (stats instanceof IntStatistics) { + return String.format("%d / %d", + ((IntStatistics) stats).getMin(), + ((IntStatistics) stats).getMax()); + } else if (stats instanceof LongStatistics) { + return String.format("%d / %d", + ((LongStatistics) stats).getMin(), + ((LongStatistics) stats).getMax()); + } else if (stats instanceof FloatStatistics) { + return String.format("%f / %f", + ((FloatStatistics) stats).getMin(), + ((FloatStatistics) stats).getMax()); + } else if (stats instanceof DoubleStatistics) { + return String.format("%f / %f", + ((DoubleStatistics) stats).getMin(), + ((DoubleStatistics) stats).getMax()); + } else if (stats instanceof BinaryStatistics) { + byte[] minBytes = stats.getMinBytes(); + byte[] maxBytes = stats.getMaxBytes(); + return String.format("%s / %s", + printable(minBytes, annotation == OriginalType.UTF8, 30), + printable(maxBytes, annotation == OriginalType.UTF8, 30)); + } else { + throw new RuntimeException("Unknown stats type: " + stats); + } + } + + public static String toString(Statistics stats, long count, OriginalType annotation) { + if (stats == null) { + return "no stats"; + } + // TODO: use original types when showing decimal, timestamp, etc. + if (stats instanceof BooleanStatistics) { + return String.format("nulls: %d/%d", stats.getNumNulls(), count); + } else if (stats instanceof IntStatistics) { + return String.format("min: %d max: %d nulls: %d/%d", + ((IntStatistics) stats).getMin(), ((IntStatistics) stats).getMax(), + stats.getNumNulls(), count); + } else if (stats instanceof LongStatistics) { + return String.format("min: %d max: %d nulls: %d/%d", + ((LongStatistics) stats).getMin(), ((LongStatistics) stats).getMax(), + stats.getNumNulls(), count); + } else if (stats instanceof FloatStatistics) { + return String.format("min: %f max: %f nulls: %d/%d", + ((FloatStatistics) stats).getMin(), + ((FloatStatistics) stats).getMax(), + stats.getNumNulls(), count); + } else if (stats instanceof DoubleStatistics) { + return String.format("min: %f max: %f nulls: %d/%d", + ((DoubleStatistics) stats).getMin(), + ((DoubleStatistics) stats).getMax(), + stats.getNumNulls(), count); + } else if (stats instanceof BinaryStatistics) { + byte[] minBytes = stats.getMinBytes(); + byte[] maxBytes = stats.getMaxBytes(); + return String.format("min: %s max: %s nulls: %d/%d", + printable(minBytes, annotation == OriginalType.UTF8, 30), + printable(maxBytes, annotation == OriginalType.UTF8, 30), + stats.getNumNulls(), count); + } else { + throw new RuntimeException("Unknown stats type: " + stats); + } + } + + private static String printable(byte[] bytes, boolean isUtf8, int len) { + if (bytes == null) { + return "null"; + } else if (isUtf8) { + return humanReadable(new String(bytes, StandardCharsets.UTF_8), len); + } else { + return humanReadable(bytes, len); + } + } + + public static String humanReadable(String str, int len) { + if (str == null) { + return "null"; + } + + StringBuilder sb = new StringBuilder(); + sb.append("\""); + if (str.length() > len - 2) { + sb.append(str.substring(0, len - 5)).append("..."); + } else { + sb.append(str); + } + sb.append("\""); + + return sb.toString(); + } + + public static String humanReadable(byte[] bytes, int len) { + if (bytes == null || bytes.length == 0) { + return "null"; + } + + StringBuilder sb = new StringBuilder(); + String asString = Hex.encodeHexString(bytes); + sb.append("0x"); + if (asString.length() > len - 2) { + sb.append(asString.substring(0, (len - 5) / 2)).append("..."); + } else { + sb.append(asString); + } + + return sb.toString(); + } + + public static String shortCodec(CompressionCodecName codec) { + switch (codec) { + case UNCOMPRESSED: + return "_"; + case SNAPPY: + return "S"; + case GZIP: + return "G"; + case LZO: + return "L"; + default: + return "?"; + } + } + + public static String encodingAsString(Encoding encoding, boolean isDict) { + switch (encoding) { + case PLAIN: + return "_"; + case PLAIN_DICTIONARY: + // data pages use RLE, dictionary pages use plain + return isDict ? "_" : "R"; + case RLE_DICTIONARY: + return "R"; + case DELTA_BINARY_PACKED: + case DELTA_LENGTH_BYTE_ARRAY: + case DELTA_BYTE_ARRAY: + return "D"; + default: + return "?"; + } + } + + public static String encodingStatsAsString(EncodingStats encodingStats) { + StringBuilder sb = new StringBuilder(); + if (encodingStats.hasDictionaryPages()) { + for (Encoding encoding: encodingStats.getDictionaryEncodings()) { + sb.append(encodingAsString(encoding, true)); + } + sb.append(" "); + } else { + sb.append(" "); + } + + Set<Encoding> encodings = encodingStats.getDataEncodings(); + if (encodings.contains(RLE_DICTIONARY) || encodings.contains(PLAIN_DICTIONARY)) { + sb.append("R"); + } + if (encodings.contains(PLAIN)) { + sb.append("_"); + } + if (encodings.contains(DELTA_BYTE_ARRAY) || + encodings.contains(DELTA_BINARY_PACKED) || + encodings.contains(DELTA_LENGTH_BYTE_ARRAY)) { + sb.append("D"); + } + + // Check for fallback and add a flag + if (encodingStats.hasDictionaryEncodedPages() && encodingStats.hasNonDictionaryEncodedPages()) { + sb.append(" F"); + } + + return sb.toString(); + } + + public static String encodingsAsString(Set<Encoding> encodings, ColumnDescriptor desc) { + StringBuilder sb = new StringBuilder(); + if (encodings.contains(RLE) || encodings.contains(BIT_PACKED)) { + sb.append(desc.getMaxDefinitionLevel() == 0 ? "B" : "R"); + sb.append(desc.getMaxRepetitionLevel() == 0 ? "B" : "R"); + if (encodings.contains(PLAIN_DICTIONARY)) { + sb.append("R"); + } + if (encodings.contains(PLAIN)) { + sb.append("_"); + } + } else { + sb.append("RR"); + if (encodings.contains(RLE_DICTIONARY)) { + sb.append("R"); + } + if (encodings.contains(PLAIN)) { + sb.append("_"); + } + if (encodings.contains(DELTA_BYTE_ARRAY) || + encodings.contains(DELTA_BINARY_PACKED) || + encodings.contains(DELTA_LENGTH_BYTE_ARRAY)) { + sb.append("D"); + } + } + return sb.toString(); + } + + private static final Splitter DOT = Splitter.on('.'); + + public static ColumnDescriptor descriptor(String column, MessageType schema) { + String[] path = Iterables.toArray(DOT.split(column), String.class); + Preconditions.checkArgument(schema.containsPath(path), + "Schema doesn't have column: " + column); + return schema.getColumnDescription(path); + } + + public static String columnName(ColumnDescriptor desc) { + return Joiner.on('.').join(desc.getPath()); + } + + public static PrimitiveType primitive(MessageType schema, String[] path) { + Type current = schema; + for (String part : path) { + current = current.asGroupType().getType(part); + if (current.isPrimitive()) { + return current.asPrimitiveType(); + } + } + return null; + } + + public static PrimitiveType primitive(String column, MessageType schema) { + String[] path = Iterables.toArray(DOT.split(column), String.class); + Preconditions.checkArgument(schema.containsPath(path), + "Schema doesn't have column: " + column); + return primitive(schema, path); + } + +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CSVSchemaCommand.java ---------------------------------------------------------------------- diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CSVSchemaCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CSVSchemaCommand.java new file mode 100644 index 0000000..4fbfb9b --- /dev/null +++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CSVSchemaCommand.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.cli.commands; + +import com.beust.jcommander.Parameter; +import com.beust.jcommander.Parameters; +import com.beust.jcommander.internal.Lists; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; +import org.apache.parquet.cli.BaseCommand; +import org.apache.parquet.cli.csv.CSVProperties; +import org.apache.parquet.cli.csv.AvroCSV; +import org.slf4j.Logger; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.List; +import java.util.Set; + +@Parameters(commandDescription="Build a schema from a CSV data sample") +public class CSVSchemaCommand extends BaseCommand { + + public CSVSchemaCommand(Logger console) { + super(console); + } + + @Parameter(description="<sample csv path>") + List<String> samplePaths; + + @Parameter(names={"-o", "--output"}, description="Save schema avsc to path") + String outputPath = null; + + @Parameter(names={"--class", "--record-name"}, required = true, + description="A name or class for the result schema") + String recordName = null; + + @Parameter(names="--minimize", + description="Minimize schema file size by eliminating white space") + boolean minimize=false; + + @Parameter(names="--delimiter", description="Delimiter character") + String delimiter = ","; + + @Parameter(names="--escape", description="Escape character") + String escape = "\\"; + + @Parameter(names="--quote", description="Quote character") + String quote = "\""; + + @Parameter(names="--no-header", description="Don't use first line as CSV header") + boolean noHeader = false; + + @Parameter(names="--skip-lines", description="Lines to skip before CSV start") + int linesToSkip = 0; + + @Parameter(names="--charset", description="Character set name", hidden = true) + String charsetName = Charset.defaultCharset().displayName(); + + @Parameter(names="--header", + description="Line to use as a header. Must match the CSV settings.") + String header; + + @Parameter(names="--require", + description="Do not allow null values for the given field") + List<String> requiredFields; + + @Override + public int run() throws IOException { + Preconditions.checkArgument(samplePaths != null && !samplePaths.isEmpty(), + "Sample CSV path is required"); + Preconditions.checkArgument(samplePaths.size() == 1, + "Only one CSV sample can be given"); + + if (header != null) { + // if a header is given on the command line, do assume one is in the file + noHeader = true; + } + + CSVProperties props = new CSVProperties.Builder() + .delimiter(delimiter) + .escape(escape) + .quote(quote) + .header(header) + .hasHeader(!noHeader) + .linesToSkip(linesToSkip) + .charset(charsetName) + .build(); + + Set<String> required = ImmutableSet.of(); + if (requiredFields != null) { + required = ImmutableSet.copyOf(requiredFields); + } + + // assume fields are nullable by default, users can easily change this + String sampleSchema = AvroCSV + .inferNullableSchema( + recordName, open(samplePaths.get(0)), props, required) + .toString(!minimize); + + output(sampleSchema, console, outputPath); + + return 0; + } + + @Override + public List<String> getExamples() { + return Lists.newArrayList( + "# Print the schema for samples.csv to standard out:", + "samples.csv --record-name Sample", + "# Write schema to sample.avsc:", + "samples.csv -o sample.avsc --record-name Sample" + ); + } + +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CatCommand.java ---------------------------------------------------------------------- diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CatCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CatCommand.java new file mode 100644 index 0000000..7703e88 --- /dev/null +++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CatCommand.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.cli.commands; + +import com.beust.jcommander.Parameter; +import com.beust.jcommander.Parameters; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.io.Closeables; +import org.apache.avro.Schema; +import org.apache.parquet.cli.BaseCommand; +import org.apache.parquet.cli.util.Expressions; +import org.slf4j.Logger; +import java.io.Closeable; +import java.io.IOException; +import java.util.List; + +import static org.apache.parquet.cli.util.Expressions.select; + +@Parameters(commandDescription = "Print the first N records from a file") +public class CatCommand extends BaseCommand { + + @Parameter(description = "<file>") + List<String> sourceFiles; + + @Parameter(names={"-n", "--num-records"}, + description="The number of records to print") + long numRecords; + + @Parameter( + names = {"-c", "--column", "--columns"}, + description = "List of columns") + List<String> columns; + + public CatCommand(Logger console, long defaultNumRecords) { + super(console); + this.numRecords = defaultNumRecords; + } + + @Override + public int run() throws IOException { + Preconditions.checkArgument( + sourceFiles != null && !sourceFiles.isEmpty(), + "Missing file name"); + Preconditions.checkArgument(sourceFiles.size() == 1, + "Only one file can be given"); + + final String source = sourceFiles.get(0); + + Schema schema = getAvroSchema(source); + Schema projection = Expressions.filterSchema(schema, columns); + + Iterable<Object> reader = openDataFile(source, projection); + boolean threw = true; + long count = 0; + try { + for (Object record : reader) { + if (numRecords > 0 && count >= numRecords) { + break; + } + if (columns == null || columns.size() != 1) { + console.info(String.valueOf(record)); + } else { + console.info(String.valueOf(select(projection, record, columns.get(0)))); + } + count += 1; + } + threw = false; + } catch (RuntimeException e) { + throw new RuntimeException("Failed on record " + count, e); + } finally { + if (reader instanceof Closeable) { + Closeables.close((Closeable) reader, threw); + } + } + + return 0; + } + + @Override + public List<String> getExamples() { + return Lists.newArrayList( + "# Show the first 10 records in file \"data.avro\":", + "data.avro", + "# Show the first 50 records in file \"data.parquet\":", + "data.parquet -n 50" + ); + } +} + http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CheckParquet251Command.java ---------------------------------------------------------------------- diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CheckParquet251Command.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CheckParquet251Command.java new file mode 100644 index 0000000..8f60821 --- /dev/null +++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CheckParquet251Command.java @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.cli.commands; + +import com.beust.jcommander.Parameter; +import com.beust.jcommander.Parameters; +import com.beust.jcommander.internal.Lists; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; +import org.apache.parquet.cli.BaseCommand; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.CorruptStatistics; +import org.apache.parquet.Version; +import org.apache.parquet.VersionParser; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.util.DynConstructors; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.ColumnReader; +import org.apache.parquet.column.page.DataPage; +import org.apache.parquet.column.page.DataPageV1; +import org.apache.parquet.column.page.DataPageV2; +import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.column.page.PageReader; +import org.apache.parquet.column.statistics.Statistics; +import org.apache.parquet.format.converter.ParquetMetadataConverter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.FileMetaData; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.io.ParquetDecodingException; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.io.api.PrimitiveConverter; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeNameConverter; +import org.slf4j.Logger; +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; + +@Parameters(commandDescription = "Check Parquet files for corrupt page and column stats (PARQUET-251)") +public class CheckParquet251Command extends BaseCommand { + + public CheckParquet251Command(Logger console) { + super(console); + } + + @Parameter(description = "<files>", required = true) + List<String> files; + + @Override + public int run() throws IOException { + boolean badFiles = false; + for (String file : files) { + String problem = check(file); + if (problem != null) { + badFiles = true; + console.info("{} has corrupt stats: {}", file, problem); + } else { + console.info("{} has no corrupt stats", file); + } + } + + return badFiles ? 1 : 0; + } + + private String check(String file) throws IOException { + Path path = qualifiedPath(file); + ParquetMetadata footer = ParquetFileReader.readFooter( + getConf(), path, ParquetMetadataConverter.NO_FILTER); + + FileMetaData meta = footer.getFileMetaData(); + String createdBy = meta.getCreatedBy(); + if (CorruptStatistics.shouldIgnoreStatistics(createdBy, BINARY)) { + // create fake metadata that will read corrupt stats and return them + FileMetaData fakeMeta = new FileMetaData( + meta.getSchema(), meta.getKeyValueMetaData(), Version.FULL_VERSION); + + // get just the binary columns + List<ColumnDescriptor> columns = Lists.newArrayList(); + Iterables.addAll(columns, Iterables.filter( + meta.getSchema().getColumns(), + new Predicate<ColumnDescriptor>() { + @Override + public boolean apply(@Nullable ColumnDescriptor input) { + return input != null && input.getType() == BINARY; + } + })); + + // now check to see if the data is actually corrupt + ParquetFileReader reader = new ParquetFileReader(getConf(), + fakeMeta, path, footer.getBlocks(), columns); + + try { + PageStatsValidator validator = new PageStatsValidator(); + for (PageReadStore pages = reader.readNextRowGroup(); pages != null; + pages = reader.readNextRowGroup()) { + validator.validate(columns, pages); + } + } catch (BadStatsException e) { + return e.getMessage(); + } + } + + return null; + } + + @Override + public List<String> getExamples() { + return Arrays.asList( + "# Check file1.parquet for corrupt page and column stats", + "file1.parquet"); + } + + + public static class BadStatsException extends RuntimeException { + public BadStatsException(String message) { + super(message); + } + } + + public class SingletonPageReader implements PageReader { + private final DictionaryPage dict; + private final DataPage data; + + public SingletonPageReader(DictionaryPage dict, DataPage data) { + this.dict = dict; + this.data = data; + } + + @Override + public DictionaryPage readDictionaryPage() { + return dict; + } + + @Override + public long getTotalValueCount() { + return data.getValueCount(); + } + + @Override + public DataPage readPage() { + return data; + } + } + + private static <T extends Comparable<T>> + Statistics<T> getStatisticsFromPageHeader(DataPage page) { + return page.accept(new DataPage.Visitor<Statistics<T>>() { + @Override + @SuppressWarnings("unchecked") + public Statistics<T> visit(DataPageV1 dataPageV1) { + return (Statistics<T>) dataPageV1.getStatistics(); + } + + @Override + @SuppressWarnings("unchecked") + public Statistics<T> visit(DataPageV2 dataPageV2) { + return (Statistics<T>) dataPageV2.getStatistics(); + } + }); + } + + private class StatsValidator<T extends Comparable<T>> { + private final boolean hasNonNull; + private final T min; + private final T max; + + public StatsValidator(DataPage page) { + Statistics<T> stats = getStatisticsFromPageHeader(page); + this.hasNonNull = stats.hasNonNullValue(); + if (hasNonNull) { + this.min = stats.genericGetMin(); + this.max = stats.genericGetMax(); + } else { + this.min = null; + this.max = null; + } + } + + public void validate(T value) { + if (hasNonNull) { + if (min.compareTo(value) > 0) { + throw new BadStatsException("Min should be <= all values."); + } + if (max.compareTo(value) < 0) { + throw new BadStatsException("Max should be >= all values."); + } + } + } + } + + private PrimitiveConverter getValidatingConverter( + final DataPage page, PrimitiveTypeName type) { + return type.convert(new PrimitiveTypeNameConverter<PrimitiveConverter, RuntimeException>() { + @Override + public PrimitiveConverter convertFLOAT(PrimitiveTypeName primitiveTypeName) { + final StatsValidator<Float> validator = new StatsValidator<Float>(page); + return new PrimitiveConverter() { + @Override + public void addFloat(float value) { + validator.validate(value); + } + }; + } + + @Override + public PrimitiveConverter convertDOUBLE(PrimitiveTypeName primitiveTypeName) { + final StatsValidator<Double> validator = new StatsValidator<Double>(page); + return new PrimitiveConverter() { + @Override + public void addDouble(double value) { + validator.validate(value); + } + }; + } + + @Override + public PrimitiveConverter convertINT32(PrimitiveTypeName primitiveTypeName) { + final StatsValidator<Integer> validator = new StatsValidator<Integer>(page); + return new PrimitiveConverter() { + @Override + public void addInt(int value) { + validator.validate(value); + } + }; + } + + @Override + public PrimitiveConverter convertINT64(PrimitiveTypeName primitiveTypeName) { + final StatsValidator<Long> validator = new StatsValidator<Long>(page); + return new PrimitiveConverter() { + @Override + public void addLong(long value) { + validator.validate(value); + } + }; + } + + @Override + public PrimitiveConverter convertBOOLEAN(PrimitiveTypeName primitiveTypeName) { + final StatsValidator<Boolean> validator = new StatsValidator<Boolean>(page); + return new PrimitiveConverter() { + @Override + public void addBoolean(boolean value) { + validator.validate(value); + } + }; + } + + @Override + public PrimitiveConverter convertINT96(PrimitiveTypeName primitiveTypeName) { + return convertBINARY(primitiveTypeName); + } + + @Override + public PrimitiveConverter convertFIXED_LEN_BYTE_ARRAY(PrimitiveTypeName primitiveTypeName) { + return convertBINARY(primitiveTypeName); + } + + @Override + public PrimitiveConverter convertBINARY(PrimitiveTypeName primitiveTypeName) { + final StatsValidator<Binary> validator = new StatsValidator<Binary>(page); + return new PrimitiveConverter() { + @Override + public void addBinary(Binary value) { + validator.validate(value); + } + }; + } + }); + } + + private static final DynConstructors.Ctor<ColumnReader> COL_READER_CTOR = + new DynConstructors.Builder(ColumnReader.class) + .hiddenImpl("org.apache.parquet.column.impl.ColumnReaderImpl", + ColumnDescriptor.class, PageReader.class, + PrimitiveConverter.class, VersionParser.ParsedVersion.class) + .build(); + + public class PageStatsValidator { + public void validate(List<ColumnDescriptor> columns, PageReadStore store) { + for (ColumnDescriptor desc : columns) { + PageReader reader = store.getPageReader(desc); + DictionaryPage dict = reader.readDictionaryPage(); + DictionaryPage reusableDict = null; + if (dict != null) { + try { + reusableDict = new DictionaryPage( + BytesInput.from(dict.getBytes().toByteArray()), + dict.getDictionarySize(), dict.getEncoding()); + } catch (IOException e) { + throw new ParquetDecodingException("Cannot read dictionary", e); + } + } + DataPage page; + while ((page = reader.readPage()) != null) { + validateStatsForPage(page, reusableDict, desc); + } + } + } + + private void validateStatsForPage(DataPage page, DictionaryPage dict, + ColumnDescriptor desc) { + SingletonPageReader reader = new SingletonPageReader(dict, page); + PrimitiveConverter converter = getValidatingConverter(page, desc.getType()); + Statistics stats = getStatisticsFromPageHeader(page); + + long numNulls = 0; + + ColumnReader column = COL_READER_CTOR.newInstance(desc, reader, converter, null); + for (int i = 0; i < reader.getTotalValueCount(); i += 1) { + if (column.getCurrentDefinitionLevel() >= desc.getMaxDefinitionLevel()) { + column.writeCurrentValueToConverter(); + } else { + numNulls += 1; + } + column.consume(); + } + + if (numNulls != stats.getNumNulls()) { + throw new BadStatsException("Number of nulls doesn't match."); + } + + console.debug(String.format( + "Validated stats min=%s max=%s nulls=%d for page=%s col=%s", + String.valueOf(stats.genericGetMin()), + String.valueOf(stats.genericGetMax()), stats.getNumNulls(), page, + Arrays.toString(desc.getPath()))); + } + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ConvertCSVCommand.java ---------------------------------------------------------------------- diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ConvertCSVCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ConvertCSVCommand.java new file mode 100644 index 0000000..624ba91 --- /dev/null +++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ConvertCSVCommand.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.cli.commands; + +import com.beust.jcommander.Parameter; +import com.beust.jcommander.Parameters; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import org.apache.parquet.cli.BaseCommand; +import org.apache.parquet.cli.csv.AvroCSVReader; +import org.apache.parquet.cli.csv.CSVProperties; +import org.apache.parquet.cli.csv.AvroCSV; +import org.apache.parquet.cli.util.Schemas; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.parquet.avro.AvroParquetWriter; +import org.apache.parquet.cli.util.Codecs; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.slf4j.Logger; +import java.io.File; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.List; +import java.util.Set; + +import static org.apache.avro.generic.GenericData.Record; +import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_1_0; +import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_2_0; + +@Parameters(commandDescription="Create a file from CSV data") +public class ConvertCSVCommand extends BaseCommand { + + public ConvertCSVCommand(Logger console) { + super(console); + } + + @Parameter(description="<csv path>") + List<String> targets; + + @Parameter( + names={"-o", "--output"}, + description="Output file path", + required=true) + String outputPath = null; + + @Parameter( + names={"-2", "--format-version-2", "--writer-version-2"}, + description="Use Parquet format version 2", + hidden = true) + boolean v2 = false; + + @Parameter(names="--delimiter", description="Delimiter character") + String delimiter = ","; + + @Parameter(names="--escape", description="Escape character") + String escape = "\\"; + + @Parameter(names="--quote", description="Quote character") + String quote = "\""; + + @Parameter(names="--no-header", description="Don't use first line as CSV header") + boolean noHeader = false; + + @Parameter(names="--skip-lines", description="Lines to skip before CSV start") + int linesToSkip = 0; + + @Parameter(names="--charset", description="Character set name", hidden = true) + String charsetName = Charset.defaultCharset().displayName(); + + @Parameter(names="--header", + description="Line to use as a header. Must match the CSV settings.") + String header; + + @Parameter(names="--require", + description="Do not allow null values for the given field") + List<String> requiredFields; + + @Parameter(names = {"-s", "--schema"}, + description = "The file containing the Avro schema.") + String avroSchemaFile; + + @Parameter(names = {"--compression-codec"}, + description = "A compression codec name.") + String compressionCodecName = "GZIP"; + + @Parameter(names="--row-group-size", description="Target row group size") + int rowGroupSize = ParquetWriter.DEFAULT_BLOCK_SIZE; + + @Parameter(names="--page-size", description="Target page size") + int pageSize = ParquetWriter.DEFAULT_PAGE_SIZE; + + @Parameter(names="--dictionary-size", description="Max dictionary page size") + int dictionaryPageSize = ParquetWriter.DEFAULT_PAGE_SIZE; + + @Parameter( + names={"--overwrite"}, + description="Remove any data already in the target view or dataset") + boolean overwrite = false; + + @Override + @SuppressWarnings("unchecked") + public int run() throws IOException { + Preconditions.checkArgument(targets != null && targets.size() == 1, + "CSV path is required."); + + if (header != null) { + // if a header is given on the command line, don't assume one is in the file + noHeader = true; + } + + CSVProperties props = new CSVProperties.Builder() + .delimiter(delimiter) + .escape(escape) + .quote(quote) + .header(header) + .hasHeader(!noHeader) + .linesToSkip(linesToSkip) + .charset(charsetName) + .build(); + + String source = targets.get(0); + + Schema csvSchema; + if (avroSchemaFile != null) { + csvSchema = Schemas.fromAvsc(open(avroSchemaFile)); + } else { + Set<String> required = ImmutableSet.of(); + if (requiredFields != null) { + required = ImmutableSet.copyOf(requiredFields); + } + + String filename = new File(source).getName(); + String recordName; + if (filename.contains(".")) { + recordName = filename.substring(0, filename.indexOf(".")); + } else { + recordName = filename; + } + + csvSchema = AvroCSV.inferNullableSchema( + recordName, open(source), props, required); + } + + long count = 0; + try (AvroCSVReader<Record> reader = new AvroCSVReader<>( + open(source), props, csvSchema, Record.class, true)) { + CompressionCodecName codec = Codecs.parquetCodec(compressionCodecName); + try (ParquetWriter<Record> writer = AvroParquetWriter + .<Record>builder(qualifiedPath(outputPath)) + .withWriterVersion(v2 ? PARQUET_2_0 : PARQUET_1_0) + .withWriteMode(overwrite ? + ParquetFileWriter.Mode.OVERWRITE : ParquetFileWriter.Mode.CREATE) + .withCompressionCodec(codec) + .withDictionaryEncoding(true) + .withDictionaryPageSize(dictionaryPageSize) + .withPageSize(pageSize) + .withRowGroupSize(rowGroupSize) + .withDataModel(GenericData.get()) + .withConf(getConf()) + .withSchema(csvSchema) + .build()) { + for (Record record : reader) { + writer.write(record); + } + } catch (RuntimeException e) { + throw new RuntimeException("Failed on record " + count, e); + } + } + + return 0; + } + + @Override + public List<String> getExamples() { + return Lists.newArrayList( + "# Create a Parquet file from a CSV file", + "sample.csv sample.parquet --schema schema.avsc", + "# Create a Parquet file in HDFS from local CSV", + "path/to/sample.csv hdfs:/user/me/sample.parquet --schema schema.avsc", + "# Create an Avro file from CSV data in S3", + "s3:/data/path/sample.csv sample.avro --format avro --schema s3:/schemas/schema.avsc" + ); + } +}