Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/19269#discussion_r142004831 --- Diff: sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleWritableDataSource.java --- @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.org.apache.spark.sql.sources.v2; + +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; + +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericRow; +import org.apache.spark.sql.catalyst.expressions.UnsafeRow; +import org.apache.spark.sql.sources.v2.DataSourceV2; +import org.apache.spark.sql.sources.v2.DataSourceV2Options; +import org.apache.spark.sql.sources.v2.ReadSupport; +import org.apache.spark.sql.sources.v2.WriteSupport; +import org.apache.spark.sql.sources.v2.reader.DataReader; +import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader; +import org.apache.spark.sql.sources.v2.reader.ReadTask; +import org.apache.spark.sql.sources.v2.writer.*; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.StructType; + +public class JavaSimpleWritableDataSource implements DataSourceV2, ReadSupport, WriteSupport { + private StructType schema = new StructType().add("i", "long").add("j", "long"); + + class Reader implements DataSourceV2Reader { + private String path; + + Reader(String path) { + this.path = path; + } + + @Override + public StructType readSchema() { + return schema; + } + + @Override + public List<ReadTask<Row>> createReadTasks() { + return java.util.Arrays.asList(new JavaSimpleCSVReadTask(path)); + } + } + + static class JavaSimpleCSVReadTask implements ReadTask<Row>, DataReader<Row> { + private String path; + private volatile Iterator<String> lines; + private volatile String currentLine; + + JavaSimpleCSVReadTask(Iterator<String> lines) { + this.lines = lines; + } + + JavaSimpleCSVReadTask(String path) { + this.path = path; + } + + @Override + public DataReader<Row> createReader() { + assert path != null; + try { + if (Files.exists(Paths.get(path))) { + return new JavaSimpleCSVReadTask(Files.readAllLines(Paths.get(path)).iterator()); + } else { + return new JavaSimpleCSVReadTask(Collections.emptyIterator()); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public boolean next() { + if (lines.hasNext()) { + currentLine = lines.next(); + return true; + } else { + return false; + } + } + + @Override + public Row get() { + String[] values = currentLine.split(","); + assert values.length == 2; + long l1 = Long.valueOf(values[0]); + long l2 = Long.valueOf(values[1]); + return new GenericRow(new Object[] {l1, l2}); + } + + @Override + public void close() throws IOException { + + } + } + + @Override + public DataSourceV2Reader createReader(DataSourceV2Options options) { + return new Reader(options.get("path").get()); + } + + + class Writer implements DataSourceV2Writer { + private String path; + + Writer(String path) { + this.path = path; + } + + @Override + public DataWriteFactory<Row> createWriterFactory() { + return new JavaSimpleCSVDataWriteFactory(path); + } + + @Override + public void commit(WriterCommitMessage[] messages) { + + } + + @Override + public void abort() { + + } + } + + class InternalRowWriter implements DataSourceV2Writer, SupportsWriteInternalRow { + private String path; + + InternalRowWriter(String path) { + this.path = path; + } + + @Override + public DataWriteFactory<InternalRow> createInternalRowWriterFactory() { + return new JavaInternalRowCSVDataWriteFactory(path); + } + + @Override + public void commit(WriterCommitMessage[] messages) { + + } + + @Override + public void abort() { + + } + } + + static class JavaSimpleCSVDataWriteFactory implements DataWriteFactory<Row>, DataWriter<Row> { + private String path; + private volatile PrintWriter fileWriter; + + JavaSimpleCSVDataWriteFactory(String path) { + this.path = path; + } + + JavaSimpleCSVDataWriteFactory(String path, PrintWriter fileWriter) { + this.path = path; + this.fileWriter = fileWriter; + } + + @Override + public DataWriter<Row> createWriter(int stageId, int partitionId, int attemptNumber) { + try { + return new JavaSimpleCSVDataWriteFactory( + path, new PrintWriter(new FileWriter(path, true))); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void write(Row record) { + fileWriter.println(record.getLong(0) + "," + record.getLong(1)); + } + + @Override + public WriterCommitMessage commit() { + fileWriter.close(); + return null; + } + + @Override + public void abort() { + fileWriter.close(); --- End diff -- if you are catching and wrapping exceptions, this should be included in the try/catch clause
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org