[
https://issues.apache.org/jira/browse/DRILL-7978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17448894#comment-17448894
]
ASF GitHub Bot commented on DRILL-7978:
---------------------------------------
cgivre commented on a change in pull request #2282:
URL: https://github.com/apache/drill/pull/2282#discussion_r756495900
##########
File path:
contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedwidthBatchReader.java
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.fixedwidth;
+
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.apache.hadoop.mapred.FileSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Locale;
+
+public class FixedwidthBatchReader implements
ManagedReader<FileSchemaNegotiator> {
+
+ private static final Logger logger =
LoggerFactory.getLogger(FixedwidthBatchReader.class);
+ private FileSplit split;
+ private final int maxRecords;
+ private final FixedwidthFormatConfig config;
+ private CustomErrorContext errorContext;
+ private InputStream fsStream;
+ private ResultSetLoader loader;
+ private RowSetLoader writer;
+ private BufferedReader reader;
+ private int lineNum;
+
+ public FixedwidthBatchReader(FixedwidthFormatConfig config, int maxRecords) {
+ this.config = config;
+ this.maxRecords = maxRecords;
+ }
+
+ @Override
+ public boolean open(FileSchemaNegotiator negotiator) {
+ split = negotiator.split();
+ errorContext = negotiator.parentErrorContext();
+ lineNum = 0;
+ try {
+ fsStream =
negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+ negotiator.tableSchema(buildSchema(), true);
+ loader = negotiator.build();
+ } catch (Exception e) {
+ throw UserException
+ .dataReadError(e)
+ .message("Failed to open input file: {}", split.getPath().toString())
+ .addContext(errorContext)
+ .addContext(e.getMessage())
+ .build(logger);
+ }
+ reader = new BufferedReader(new InputStreamReader(fsStream,
Charsets.UTF_8));
+ return true;
+ }
+
+ @Override
+ public boolean next() { // Use loader to read data from file to turn into
Drill rows
+ String line;
+ RowSetLoader writer = loader.writer();
Review comment:
This line should be in the `open()` method.
##########
File path:
contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedwidthBatchReader.java
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.fixedwidth;
+
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.apache.hadoop.mapred.FileSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Locale;
+
+public class FixedwidthBatchReader implements
ManagedReader<FileSchemaNegotiator> {
+
+ private static final Logger logger =
LoggerFactory.getLogger(FixedwidthBatchReader.class);
+ private FileSplit split;
+ private final int maxRecords;
+ private final FixedwidthFormatConfig config;
+ private CustomErrorContext errorContext;
+ private InputStream fsStream;
+ private ResultSetLoader loader;
+ private RowSetLoader writer;
+ private BufferedReader reader;
+ private int lineNum;
+
+ public FixedwidthBatchReader(FixedwidthFormatConfig config, int maxRecords) {
+ this.config = config;
+ this.maxRecords = maxRecords;
+ }
+
+ @Override
+ public boolean open(FileSchemaNegotiator negotiator) {
+ split = negotiator.split();
+ errorContext = negotiator.parentErrorContext();
+ lineNum = 0;
+ try {
+ fsStream =
negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+ negotiator.tableSchema(buildSchema(), true);
+ loader = negotiator.build();
+ } catch (Exception e) {
+ throw UserException
+ .dataReadError(e)
+ .message("Failed to open input file: {}", split.getPath().toString())
+ .addContext(errorContext)
+ .addContext(e.getMessage())
+ .build(logger);
+ }
+ reader = new BufferedReader(new InputStreamReader(fsStream,
Charsets.UTF_8));
+ return true;
+ }
+
+ @Override
+ public boolean next() { // Use loader to read data from file to turn into
Drill rows
+ String line;
+ RowSetLoader writer = loader.writer();
+
+ try {
+ line = reader.readLine();
Review comment:
Why not include this in the loop?
##########
File path:
contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedwidthFormatPlugin.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.fixedwidth;
+
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
+import
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder;
+import
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.hadoop.conf.Configuration;
+
+
+public class FixedwidthFormatPlugin extends
EasyFormatPlugin<FixedwidthFormatConfig> {
+
+ protected static final String DEFAULT_NAME = "fixedwidth";
+
+ private static class FixedwidthReaderFactory extends FileReaderFactory {
+
+ private final FixedwidthFormatConfig config;
+ private final int maxRecords;
+
+ public FixedwidthReaderFactory(FixedwidthFormatConfig config, int
maxRecords) {
+ this.config = config;
+ this.maxRecords = maxRecords;
+ }
+
+ @Override
+ public ManagedReader<? extends FileSchemaNegotiator> newReader() {
+ return new FixedwidthBatchReader(config, maxRecords);
+ }
+ }
+
+ public FixedwidthFormatPlugin(String name,
+ DrillbitContext context,
+ Configuration fsConf,
+ StoragePluginConfig storageConfig,
+ FixedwidthFormatConfig formatConfig) {
+ super(name, easyConfig(fsConf, formatConfig), context, storageConfig,
formatConfig);
+ }
+
+ private static EasyFormatConfig easyConfig(Configuration fsConf,
FixedwidthFormatConfig pluginConfig) {
+ return EasyFormatConfig.builder()
+ .readable(true)
+ .writable(false)
+ .blockSplittable(false) // Change to true
Review comment:
Change to true.
##########
File path:
contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedwidthBatchReader.java
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.fixedwidth;
+
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.apache.hadoop.mapred.FileSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Locale;
+
+public class FixedwidthBatchReader implements
ManagedReader<FileSchemaNegotiator> {
+
+ private static final Logger logger =
LoggerFactory.getLogger(FixedwidthBatchReader.class);
+ private FileSplit split;
+ private final int maxRecords;
+ private final FixedwidthFormatConfig config;
+ private CustomErrorContext errorContext;
+ private InputStream fsStream;
+ private ResultSetLoader loader;
+ private RowSetLoader writer;
+ private BufferedReader reader;
+ private int lineNum;
+
+ public FixedwidthBatchReader(FixedwidthFormatConfig config, int maxRecords) {
+ this.config = config;
+ this.maxRecords = maxRecords;
+ }
+
+ @Override
+ public boolean open(FileSchemaNegotiator negotiator) {
+ split = negotiator.split();
+ errorContext = negotiator.parentErrorContext();
+ lineNum = 0;
+ try {
+ fsStream =
negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+ negotiator.tableSchema(buildSchema(), true);
Review comment:
Here's where you can check to see whether the user provided a schema or
not. You could do something like this:
```java
if (negotiator.hasProvidedSchema()) {
TupleMetadata providedSchema = negotiator.providedSchema();
// Build column writer array
negotiator.tableSchema(finalSchema, true);
} else {
negotiator.tableSchema(buildSchema(), true);
}
```
##########
File path:
contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedwidthBatchReader.java
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.fixedwidth;
+
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.apache.hadoop.mapred.FileSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Locale;
+
+public class FixedwidthBatchReader implements
ManagedReader<FileSchemaNegotiator> {
+
+ private static final Logger logger =
LoggerFactory.getLogger(FixedwidthBatchReader.class);
+ private FileSplit split;
+ private final int maxRecords;
+ private final FixedwidthFormatConfig config;
+ private CustomErrorContext errorContext;
+ private InputStream fsStream;
+ private ResultSetLoader loader;
+ private RowSetLoader writer;
+ private BufferedReader reader;
+ private int lineNum;
+
+ public FixedwidthBatchReader(FixedwidthFormatConfig config, int maxRecords) {
+ this.config = config;
+ this.maxRecords = maxRecords;
+ }
+
+ @Override
+ public boolean open(FileSchemaNegotiator negotiator) {
+ split = negotiator.split();
+ errorContext = negotiator.parentErrorContext();
+ lineNum = 0;
+ try {
+ fsStream =
negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+ negotiator.tableSchema(buildSchema(), true);
+ loader = negotiator.build();
+ } catch (Exception e) {
+ throw UserException
+ .dataReadError(e)
+ .message("Failed to open input file: {}", split.getPath().toString())
+ .addContext(errorContext)
+ .addContext(e.getMessage())
Review comment:
You can remove this second line. Also, please add `e.getMessage()` to
the message line.
##########
File path:
contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedwidthBatchReader.java
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.fixedwidth;
+
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.apache.hadoop.mapred.FileSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Locale;
+
+public class FixedwidthBatchReader implements
ManagedReader<FileSchemaNegotiator> {
+
+ private static final Logger logger =
LoggerFactory.getLogger(FixedwidthBatchReader.class);
+ private FileSplit split;
+ private final int maxRecords;
+ private final FixedwidthFormatConfig config;
+ private CustomErrorContext errorContext;
+ private InputStream fsStream;
+ private ResultSetLoader loader;
+ private RowSetLoader writer;
+ private BufferedReader reader;
+ private int lineNum;
+
+ public FixedwidthBatchReader(FixedwidthFormatConfig config, int maxRecords) {
+ this.config = config;
+ this.maxRecords = maxRecords;
+ }
+
+ @Override
+ public boolean open(FileSchemaNegotiator negotiator) {
+ split = negotiator.split();
+ errorContext = negotiator.parentErrorContext();
+ lineNum = 0;
+ try {
+ fsStream =
negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+ negotiator.tableSchema(buildSchema(), true);
+ loader = negotiator.build();
+ } catch (Exception e) {
+ throw UserException
+ .dataReadError(e)
+ .message("Failed to open input file: {}", split.getPath().toString())
+ .addContext(errorContext)
+ .addContext(e.getMessage())
+ .build(logger);
+ }
+ reader = new BufferedReader(new InputStreamReader(fsStream,
Charsets.UTF_8));
+ return true;
+ }
+
+ @Override
+ public boolean next() { // Use loader to read data from file to turn into
Drill rows
+ String line;
+ RowSetLoader writer = loader.writer();
+
+ try {
+ line = reader.readLine();
+ while (!writer.isFull() && line != null) {
+ writer.start();
+ parseLine(line, writer);
+ writer.save();
+ line = reader.readLine();
+ lineNum++;
+ }
+ } catch (IOException e) {
+ throw UserException
+ .dataReadError(e)
+ .message("Failed to read input file: {}", split.getPath().toString())
+ .addContext(errorContext)
+ .addContext(e.getMessage())
Review comment:
For the error message, you don't need to have multiple `addContext()`
calls. The main thing is to pass the `errorContext`. I would add the
`e.getMessage()` to the `message()` call.
##########
File path:
contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedwidthBatchReader.java
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.fixedwidth;
+
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.apache.hadoop.mapred.FileSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Locale;
+
+public class FixedwidthBatchReader implements
ManagedReader<FileSchemaNegotiator> {
+
+ private static final Logger logger =
LoggerFactory.getLogger(FixedwidthBatchReader.class);
+ private FileSplit split;
+ private final int maxRecords;
+ private final FixedwidthFormatConfig config;
+ private CustomErrorContext errorContext;
+ private InputStream fsStream;
+ private ResultSetLoader loader;
+ private RowSetLoader writer;
+ private BufferedReader reader;
+ private int lineNum;
+
+ public FixedwidthBatchReader(FixedwidthFormatConfig config, int maxRecords) {
+ this.config = config;
+ this.maxRecords = maxRecords;
+ }
+
+ @Override
+ public boolean open(FileSchemaNegotiator negotiator) {
+ split = negotiator.split();
+ errorContext = negotiator.parentErrorContext();
+ lineNum = 0;
+ try {
+ fsStream =
negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+ negotiator.tableSchema(buildSchema(), true);
+ loader = negotiator.build();
+ } catch (Exception e) {
+ throw UserException
+ .dataReadError(e)
+ .message("Failed to open input file: {}", split.getPath().toString())
+ .addContext(errorContext)
+ .addContext(e.getMessage())
+ .build(logger);
+ }
+ reader = new BufferedReader(new InputStreamReader(fsStream,
Charsets.UTF_8));
+ return true;
+ }
+
+ @Override
+ public boolean next() { // Use loader to read data from file to turn into
Drill rows
+ String line;
+ RowSetLoader writer = loader.writer();
+
+ try {
+ line = reader.readLine();
+ while (!writer.isFull() && line != null) {
+ writer.start();
+ parseLine(line, writer);
+ writer.save();
+ line = reader.readLine();
+ lineNum++;
+ }
+ } catch (IOException e) {
+ throw UserException
+ .dataReadError(e)
+ .message("Failed to read input file: {}", split.getPath().toString())
+ .addContext(errorContext)
+ .addContext(e.getMessage())
Review comment:
Here and elsewhere.
##########
File path:
contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedwidthBatchReader.java
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.fixedwidth;
+
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.apache.hadoop.mapred.FileSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Locale;
+
+public class FixedwidthBatchReader implements
ManagedReader<FileSchemaNegotiator> {
+
+ private static final Logger logger =
LoggerFactory.getLogger(FixedwidthBatchReader.class);
+ private FileSplit split;
+ private final int maxRecords;
+ private final FixedwidthFormatConfig config;
+ private CustomErrorContext errorContext;
+ private InputStream fsStream;
+ private ResultSetLoader loader;
+ private RowSetLoader writer;
+ private BufferedReader reader;
+ private int lineNum;
+
+ public FixedwidthBatchReader(FixedwidthFormatConfig config, int maxRecords) {
+ this.config = config;
+ this.maxRecords = maxRecords;
+ }
+
+ @Override
+ public boolean open(FileSchemaNegotiator negotiator) {
+ split = negotiator.split();
+ errorContext = negotiator.parentErrorContext();
+ lineNum = 0;
+ try {
+ fsStream =
negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+ negotiator.tableSchema(buildSchema(), true);
+ loader = negotiator.build();
+ } catch (Exception e) {
+ throw UserException
+ .dataReadError(e)
+ .message("Failed to open input file: {}", split.getPath().toString())
+ .addContext(errorContext)
+ .addContext(e.getMessage())
+ .build(logger);
+ }
+ reader = new BufferedReader(new InputStreamReader(fsStream,
Charsets.UTF_8));
+ return true;
+ }
+
+ @Override
+ public boolean next() { // Use loader to read data from file to turn into
Drill rows
+ String line;
+ RowSetLoader writer = loader.writer();
+
+ try {
+ line = reader.readLine();
+ while (!writer.isFull() && line != null) {
+ writer.start();
+ parseLine(line, writer);
+ writer.save();
+ line = reader.readLine();
+ lineNum++;
+ }
+ } catch (IOException e) {
+ throw UserException
+ .dataReadError(e)
+ .message("Failed to read input file: {}", split.getPath().toString())
+ .addContext(errorContext)
+ .addContext(e.getMessage())
+ .addContext("Line Number", lineNum)
+ .build(logger);
+ }
+ return writer.limitReached(maxRecords); // returns false when maxRecords
limit has been reached
Review comment:
The `next()` method needs some work. Really this should be called
`nextBatch()` as the next method returns `true` when there is more data, to
read, `false` if not.
```java
@Override
public boolean next() {
while (!rowWriter.isFull()) {
if (!processNextLine()) {
return false;
}
}
return true;
}
```
This method will iterate through the batch of data, and when the `rowWriter`
is full, (IE the batch is full) it will stop reading, BUT the method will
return `true` because there is more data to read. The limit is pushed down in
the `processNextLine()` method.
##########
File path:
contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedwidthBatchReader.java
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.fixedwidth;
+
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.apache.hadoop.mapred.FileSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Locale;
+
+public class FixedwidthBatchReader implements
ManagedReader<FileSchemaNegotiator> {
+
+ private static final Logger logger =
LoggerFactory.getLogger(FixedwidthBatchReader.class);
+ private FileSplit split;
+ private final int maxRecords;
+ private final FixedwidthFormatConfig config;
+ private CustomErrorContext errorContext;
+ private InputStream fsStream;
+ private ResultSetLoader loader;
+ private RowSetLoader writer;
+ private BufferedReader reader;
+ private int lineNum;
+
+ public FixedwidthBatchReader(FixedwidthFormatConfig config, int maxRecords) {
+ this.config = config;
+ this.maxRecords = maxRecords;
+ }
+
+ @Override
+ public boolean open(FileSchemaNegotiator negotiator) {
+ split = negotiator.split();
+ errorContext = negotiator.parentErrorContext();
+ lineNum = 0;
+ try {
+ fsStream =
negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+ negotiator.tableSchema(buildSchema(), true);
+ loader = negotiator.build();
+ } catch (Exception e) {
+ throw UserException
+ .dataReadError(e)
+ .message("Failed to open input file: {}", split.getPath().toString())
+ .addContext(errorContext)
+ .addContext(e.getMessage())
+ .build(logger);
+ }
+ reader = new BufferedReader(new InputStreamReader(fsStream,
Charsets.UTF_8));
+ return true;
+ }
+
+ @Override
+ public boolean next() { // Use loader to read data from file to turn into
Drill rows
+ String line;
+ RowSetLoader writer = loader.writer();
+
+ try {
+ line = reader.readLine();
+ while (!writer.isFull() && line != null) {
+ writer.start();
+ parseLine(line, writer);
+ writer.save();
+ line = reader.readLine();
+ lineNum++;
+ }
+ } catch (IOException e) {
+ throw UserException
+ .dataReadError(e)
+ .message("Failed to read input file: {}", split.getPath().toString())
+ .addContext(errorContext)
+ .addContext(e.getMessage())
+ .addContext("Line Number", lineNum)
+ .build(logger);
+ }
+ return writer.limitReached(maxRecords); // returns false when maxRecords
limit has been reached
+ }
+
+ @Override
+ public void close() {
+ if (fsStream != null){
+ AutoCloseables.closeSilently(fsStream);
Review comment:
This line should be out of the if statement.
##########
File path:
contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedwidthBatchReader.java
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.fixedwidth;
+
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.apache.hadoop.mapred.FileSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Locale;
+
+public class FixedwidthBatchReader implements
ManagedReader<FileSchemaNegotiator> {
+
+ private static final Logger logger =
LoggerFactory.getLogger(FixedwidthBatchReader.class);
+ private FileSplit split;
+ private final int maxRecords;
+ private final FixedwidthFormatConfig config;
+ private CustomErrorContext errorContext;
+ private InputStream fsStream;
+ private ResultSetLoader loader;
+ private RowSetLoader writer;
+ private BufferedReader reader;
+ private int lineNum;
+
+ public FixedwidthBatchReader(FixedwidthFormatConfig config, int maxRecords) {
+ this.config = config;
+ this.maxRecords = maxRecords;
+ }
+
+
+ @Override
+ public boolean open(FileSchemaNegotiator negotiator) {
+ split = negotiator.split();
+ errorContext = negotiator.parentErrorContext();
+ lineNum = 0;
+ try {
+ fsStream =
negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+ negotiator.tableSchema(buildSchema(), true);
+ loader = negotiator.build();
+ } catch (Exception e) {
+ throw UserException
+ .dataReadError(e)
+ .message("Failed to open input file: {}", split.getPath().toString())
+ .addContext(errorContext)
+ .addContext(e.getMessage())
+ .build(logger);
+ }
+ reader = new BufferedReader(new InputStreamReader(fsStream,
Charsets.UTF_8));
+ return true;
+ }
+
+ @Override
+ public boolean next() { // Use loader to read data from file to turn into
Drill rows
+ String line;
+ RowSetLoader writer = loader.writer();
+
+ try {
+ line = reader.readLine();
+ while (!writer.isFull() && line != null) {
+ writer.start();
+ parseLine(line, writer);
+ writer.save();
+ line = reader.readLine();
+ lineNum++;
+ }
+ } catch (IOException e) {
+ throw UserException
+ .dataReadError(e)
+ .message("Failed to read input file: {}", split.getPath().toString())
+ .addContext(errorContext)
+ .addContext(e.getMessage())
+ .addContext("Line Number", lineNum)
+ .build(logger);
+ }
+ return writer.limitReached(maxRecords); // returns false when maxRecords
limit has been reached
+ }
+
+ @Override
+ public void close() {
+ if (fsStream != null){
+ AutoCloseables.closeSilently(fsStream);
+ fsStream = null;
+ }
+ }
+
+ private TupleMetadata buildSchema() {
+ SchemaBuilder builder = new SchemaBuilder();
+ for (FixedwidthFieldConfig field : config.getFields()) {
+ if (field.getType() == TypeProtos.MinorType.VARDECIMAL){
+ builder.addNullable(field.getName(),
TypeProtos.MinorType.VARDECIMAL,38,4);
+ //revisit this
+ } else {
+ builder.addNullable(field.getName(), field.getType());
+ }
+ }
+ return builder.buildSchema();
+ }
+
+
+ private boolean parseLine(String line, RowSetLoader writer) throws
IOException {
+ int i = 0;
+ TypeProtos.MinorType dataType;
+ String dateTimeFormat;
+ String value;
+ for (FixedwidthFieldConfig field : config.getFields()) {
Review comment:
My original understanding of this was that for the fixed width plugin
was that it would work in a similar manner to the log regex reader where the
user provides the schema in the config, either in the format config or at query
time using the `table()` function.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
> Fixed Width Format Plugin
> -------------------------
>
> Key: DRILL-7978
> URL: https://issues.apache.org/jira/browse/DRILL-7978
> Project: Apache Drill
> Issue Type: New Feature
> Components: Storage - Other
> Reporter: Megan Foss
> Priority: Major
>
> Developing format plugin to parse fixed width files.
> Fixed Width Text File Definition:
> https://www.oracle.com/webfolder/technetwork/data-quality/edqhelp/Content/introduction/getting_started/configuring_fixed_width_text_file_formats.htm
--
This message was sent by Atlassian Jira
(v8.20.1#820001)