This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d4d910cec1 FileWriter bug fixes (#12208)
d4d910cec1 is described below
commit d4d910cec11569c5a0b6f986b361e0b3c1903962
Author: Shounak kulkarni <[email protected]>
AuthorDate: Tue Jan 2 23:03:24 2024 +0530
FileWriter bug fixes (#12208)
* avoid writing header line for all file formats
* ensure number of total docs matches the spec
* introduce preprocess step for every file being written
* extract headers once and reuse them
---
.../recommender/data/writer/CsvWriter.java | 14 +++++++++++++
.../recommender/data/writer/FileWriter.java | 23 ++++++++++++++--------
2 files changed, 29 insertions(+), 8 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/CsvWriter.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/CsvWriter.java
index 88547cc757..d3587336da 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/CsvWriter.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/CsvWriter.java
@@ -25,6 +25,14 @@ import
org.apache.pinot.controller.recommender.data.generator.DataGenerator;
public class CsvWriter extends FileWriter {
+ private String _headers;
+
+ @Override
+ public void init(WriterSpec spec) {
+ super.init(spec);
+ _headers = StringUtils.join(_spec.getGenerator().nextRow().keySet(), ",");
+ }
+
@Override
protected String generateRow(DataGenerator generator) {
Map<String, Object> row = generator.nextRow();
@@ -38,6 +46,12 @@ public class CsvWriter extends FileWriter {
return StringUtils.join(values, ",");
}
+ @Override
+ protected void preprocess(java.io.FileWriter writer)
+ throws Exception {
+ writer.append(_headers).append('\n');
+ }
+
private Object serializeIfMultiValue(Object obj) {
if (obj instanceof List) {
return StringUtils.join((List) obj, ";");
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/FileWriter.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/FileWriter.java
index b74c2ebbde..f02f245ced 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/FileWriter.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/FileWriter.java
@@ -20,7 +20,6 @@ package org.apache.pinot.controller.recommender.data.writer;
import java.io.File;
import java.util.Objects;
-import org.apache.commons.lang.StringUtils;
import org.apache.pinot.controller.recommender.data.generator.DataGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -29,7 +28,7 @@ import org.slf4j.LoggerFactory;
public abstract class FileWriter implements Writer {
private static final Logger LOGGER =
LoggerFactory.getLogger(FileWriter.class);
- private FileWriterSpec _spec;
+ protected FileWriterSpec _spec;
@Override
public void init(WriterSpec spec) {
_spec = (FileWriterSpec) spec;
@@ -38,21 +37,29 @@ public abstract class FileWriter implements Writer {
@Override
public void write()
throws Exception {
- final int numPerFiles = (int) (_spec.getTotalDocs() / _spec.getNumFiles());
- final String headers =
StringUtils.join(_spec.getGenerator().nextRow().keySet(), ",");
+ long totalDocs = _spec.getTotalDocs();
+ final long docsPerFile = (long) Math.ceil((double) totalDocs /
_spec.getNumFiles());
final String extension = getExtension() == null ? "" :
String.format(".%s", getExtension());
- for (int i = 0; i < _spec.getNumFiles(); i++) {
+ long ingestedDocs = 0;
+ int fileIndex = 0;
+ while (ingestedDocs < totalDocs) {
try (java.io.FileWriter writer =
- new java.io.FileWriter(new File(_spec.getBaseDir(),
String.format("output_%d%s", i, extension)))) {
- writer.append(headers).append('\n');
- for (int j = 0; j < numPerFiles; j++) {
+ new java.io.FileWriter(new File(_spec.getBaseDir(),
String.format("output_%d%s", fileIndex, extension)))) {
+ preprocess(writer);
+ for (int j = 0; j < docsPerFile && ingestedDocs < totalDocs; j++) {
String appendString = generateRow(_spec.getGenerator());
writer.append(appendString).append('\n');
+ ingestedDocs++;
}
}
+ fileIndex++;
}
}
+ protected void preprocess(java.io.FileWriter writer)
+ throws Exception {
+ }
+
protected String getExtension() {
return null;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]