ZihanLi58 commented on code in PR #3618: URL: https://github.com/apache/gobblin/pull/3618#discussion_r1053611194
########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/Manifest.java: ########## @@ -0,0 +1,117 @@ +package org.apache.gobblin.data.management.copy; + +import com.google.common.reflect.TypeToken; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.stream.JsonReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.lang.reflect.Type; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Iterator; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + + +/** + * Manifest schema and serDe + * https://iwww.corp.linkedin.com/wiki/cf/display/ENGS/Manifest+based+distcp+runbook + */ +public class Manifest { + private static final Gson GSON = new GsonBuilder().setPrettyPrinting().create(); + private static final Type CopyableUnitListType = new TypeToken<ArrayList<CopyableUnit>>(){}.getType(); + + + public final ArrayList<CopyableUnit> _copyableUnits; + + public Manifest() { + _copyableUnits = new ArrayList<>(); + } + + public Manifest(ArrayList<CopyableUnit> copyableUnits) { + _copyableUnits = copyableUnits; + } + + public void add(Manifest.CopyableUnit copyableUnit) { + _copyableUnits.add(copyableUnit); + } + + public static class CopyableUnit { + public final Integer id; + public final String fileName; + public final String fileGroup; + public final Integer fileSizeInBytes; Review Comment: This should be long? ########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/Manifest.java: ########## @@ -0,0 +1,117 @@ +package org.apache.gobblin.data.management.copy; + +import com.google.common.reflect.TypeToken; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.stream.JsonReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.lang.reflect.Type; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Iterator; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + + +/** + * Manifest schema and serDe + * https://iwww.corp.linkedin.com/wiki/cf/display/ENGS/Manifest+based+distcp+runbook + */ +public class Manifest { + private static final Gson GSON = new GsonBuilder().setPrettyPrinting().create(); + private static final Type CopyableUnitListType = new TypeToken<ArrayList<CopyableUnit>>(){}.getType(); + + + public final ArrayList<CopyableUnit> _copyableUnits; + + public Manifest() { + _copyableUnits = new ArrayList<>(); + } + + public Manifest(ArrayList<CopyableUnit> copyableUnits) { + _copyableUnits = copyableUnits; + } + + public void add(Manifest.CopyableUnit copyableUnit) { + _copyableUnits.add(copyableUnit); + } + + public static class CopyableUnit { + public final Integer id; + public final String fileName; + public final String fileGroup; + public final Integer fileSizeInBytes; + public final Integer fileModificationTime; + + public CopyableUnit(Integer id, String fileName, String fileGroup, Integer fileSizeInBytes, Integer fileModificationTime) { + this.id = id; + this.fileName = fileName; + this.fileGroup = fileGroup; + this.fileSizeInBytes = fileSizeInBytes; + this.fileModificationTime = fileModificationTime; + } + } + + /** + * + * @param fs + * @param path path manifest file location + * @return + * @throws IOException + */ + public static Manifest read(FileSystem fs, Path path) throws IOException { + JsonReader jsonReader = new JsonReader(new InputStreamReader(fs.open(path), "UTF-8")); + return new Manifest(GSON.fromJson(jsonReader, CopyableUnitListType)); + } + + /** + * + * @param fs + * @param path path manifest file location + * @throws IOException + */ + public void write(FileSystem fs, Path path) throws IOException { + String outputJson = GSON.toJson(this._copyableUnits, CopyableUnitListType); + FSDataOutputStream out = fs.create(path, true); + out.write(outputJson.getBytes(StandardCharsets.UTF_8)); + out.flush(); + out.close(); + } + + public static ManifestIterator getReadIterator(FileSystem fs, Path path) throws IOException { + return new ManifestIterator(fs, path); + } + + public static class ManifestIterator implements Iterator { Review Comment: Change the class name to be CopyableUnitIterator? ########## gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/TestManifest.java: ########## @@ -0,0 +1,67 @@ +package org.apache.gobblin.data.management.dataset; + +import com.google.common.io.Files; +import java.io.File; +import java.io.IOException; +import org.apache.gobblin.data.management.copy.Manifest; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class TestManifest { + private FileSystem localFs; + + + public TestManifest() throws IOException { + localFs = FileSystem.getLocal(new Configuration()); + } + + @Test + public void manifestSanityRead() throws IOException { + //Get manifest Path + String manifestPath = + getClass().getClassLoader().getResource("manifestBasedDistcpTest/sampleManifest.json").getPath(); + + Manifest manifest = Manifest.read(localFs, new Path(manifestPath)); + Assert.assertEquals(manifest._copyableUnits.size(), 2); + Manifest.CopyableUnit cu = manifest._copyableUnits.get(0); + int id = cu.id.intValue(); + Assert.assertEquals(id, 1); + Assert.assertEquals(cu.fileName, "/tmp/dataset/test1.txt"); + } + + @Test + public void manifestSanityWrite() throws IOException { + File tmpDir = Files.createTempDir(); + Path output = new Path(tmpDir.getAbsolutePath(), "test"); + Manifest manifest = new Manifest(); + manifest.add(new Manifest.CopyableUnit(null, "testfilename", null, null, null)); + manifest.write(localFs, output); + + Manifest readManifest = Manifest.read(localFs, output); + Assert.assertEquals(manifest._copyableUnits.size(), 1); + Assert.assertEquals(manifest._copyableUnits.get(0).fileName, "testfilename"); + } + + @Test + public void manifestSanityReadIterator() throws IOException { + //Get manifest Path + String manifestPath = + getClass().getClassLoader().getResource("manifestBasedDistcpTest/sampleManifest.json").getPath(); + + Manifest manifest = Manifest.read(localFs, new Path(manifestPath)); + + Manifest.ManifestIterator manifestIterator = Manifest.getReadIterator(localFs, new Path(manifestPath)); + int count = 0; + while (manifestIterator.hasNext()) { + Manifest.CopyableUnit cu = manifestIterator.next(); + Assert.assertEquals(cu.fileName, manifest._copyableUnits.get(count).fileName); + Assert.assertEquals(cu.id, manifest._copyableUnits.get(count).id); + Assert.assertEquals(cu.fileGroup, manifest._copyableUnits.get(count).fileGroup); + count++; + } Review Comment: Can we assert count as well to make sure we reach the end? Also do we need close the iterator at the end of the test? ########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/Manifest.java: ########## @@ -0,0 +1,117 @@ +package org.apache.gobblin.data.management.copy; + +import com.google.common.reflect.TypeToken; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.stream.JsonReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.lang.reflect.Type; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Iterator; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + + +/** + * Manifest schema and serDe + * https://iwww.corp.linkedin.com/wiki/cf/display/ENGS/Manifest+based+distcp+runbook + */ +public class Manifest { + private static final Gson GSON = new GsonBuilder().setPrettyPrinting().create(); + private static final Type CopyableUnitListType = new TypeToken<ArrayList<CopyableUnit>>(){}.getType(); + + + public final ArrayList<CopyableUnit> _copyableUnits; + + public Manifest() { + _copyableUnits = new ArrayList<>(); + } + + public Manifest(ArrayList<CopyableUnit> copyableUnits) { + _copyableUnits = copyableUnits; + } + + public void add(Manifest.CopyableUnit copyableUnit) { + _copyableUnits.add(copyableUnit); + } + + public static class CopyableUnit { + public final Integer id; + public final String fileName; + public final String fileGroup; Review Comment: Also do we need to make these fields (fileGroup, fileSize, and modTime) as optional? ########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/Manifest.java: ########## @@ -0,0 +1,117 @@ +package org.apache.gobblin.data.management.copy; + +import com.google.common.reflect.TypeToken; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.stream.JsonReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.lang.reflect.Type; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Iterator; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + + +/** + * Manifest schema and serDe + * https://iwww.corp.linkedin.com/wiki/cf/display/ENGS/Manifest+based+distcp+runbook + */ +public class Manifest { + private static final Gson GSON = new GsonBuilder().setPrettyPrinting().create(); + private static final Type CopyableUnitListType = new TypeToken<ArrayList<CopyableUnit>>(){}.getType(); + + + public final ArrayList<CopyableUnit> _copyableUnits; + + public Manifest() { + _copyableUnits = new ArrayList<>(); + } + + public Manifest(ArrayList<CopyableUnit> copyableUnits) { + _copyableUnits = copyableUnits; + } + + public void add(Manifest.CopyableUnit copyableUnit) { + _copyableUnits.add(copyableUnit); + } + + public static class CopyableUnit { + public final Integer id; + public final String fileName; + public final String fileGroup; + public final Integer fileSizeInBytes; + public final Integer fileModificationTime; Review Comment: Same here, type should be long? ########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/Manifest.java: ########## @@ -0,0 +1,117 @@ +package org.apache.gobblin.data.management.copy; + +import com.google.common.reflect.TypeToken; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.stream.JsonReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.lang.reflect.Type; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Iterator; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + + +/** + * Manifest schema and serDe + * https://iwww.corp.linkedin.com/wiki/cf/display/ENGS/Manifest+based+distcp+runbook + */ +public class Manifest { Review Comment: Change class name to be CopyManifest? ########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/Manifest.java: ########## @@ -0,0 +1,117 @@ +package org.apache.gobblin.data.management.copy; + +import com.google.common.reflect.TypeToken; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.stream.JsonReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.lang.reflect.Type; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Iterator; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + + +/** + * Manifest schema and serDe + * https://iwww.corp.linkedin.com/wiki/cf/display/ENGS/Manifest+based+distcp+runbook + */ +public class Manifest { + private static final Gson GSON = new GsonBuilder().setPrettyPrinting().create(); + private static final Type CopyableUnitListType = new TypeToken<ArrayList<CopyableUnit>>(){}.getType(); + + + public final ArrayList<CopyableUnit> _copyableUnits; + + public Manifest() { + _copyableUnits = new ArrayList<>(); + } + + public Manifest(ArrayList<CopyableUnit> copyableUnits) { + _copyableUnits = copyableUnits; + } + + public void add(Manifest.CopyableUnit copyableUnit) { + _copyableUnits.add(copyableUnit); + } + + public static class CopyableUnit { + public final Integer id; Review Comment: Let's remove id for now. As it's not needed and might limit the write concurrency, we can easily add it back in the future -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org