This is an automated email from the ASF dual-hosted git repository.
jin pushed a commit to branch master
in repository
https://gitbox.apache.org/repos/asf/incubator-hugegraph-toolchain.git
The following commit(s) were added to refs/heads/master by this push:
new 39f670ac fix(loader): support file name with prefix for hdfs source
(#571)
39f670ac is described below
commit 39f670ac21a72c25b6f43213aeb81761c23c954d
Author: YangJiaqi <[email protected]>
AuthorDate: Tue Mar 19 16:41:21 2024 +0800
fix(loader): support file name with prefix for hdfs source (#571)
* use iterator to scan files
---------
Co-authored-by: jacky.yang <[email protected]>
Co-authored-by: imbajin <[email protected]>
---
.../loader/reader/hdfs/HDFSFileReader.java | 41 ++++++++++++++++------
.../loader/test/functional/HDFSLoadTest.java | 27 ++++++++++++++
.../resources/hdfs_file_with_prefix/core-site.xml | 22 ++++++++++++
.../resources/hdfs_file_with_prefix/schema.groovy | 33 +++++++++++++++++
.../hdfs_file_with_prefix/struct_hdfs.json | 19 ++++++++++
5 files changed, 132 insertions(+), 10 deletions(-)
diff --git
a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/reader/hdfs/HDFSFileReader.java
b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/reader/hdfs/HDFSFileReader.java
index 93f8b545..26e769d6 100644
---
a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/reader/hdfs/HDFSFileReader.java
+++
b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/reader/hdfs/HDFSFileReader.java
@@ -26,8 +26,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hugegraph.loader.constant.Constants;
import org.apache.hugegraph.loader.exception.LoadException;
@@ -52,6 +52,8 @@ public class HDFSFileReader extends FileReader {
private final FileSystem hdfs;
private final Configuration conf;
+ private String prefix;
+ private String input_path;
public HDFSFileReader(HDFSSource source) {
super(source);
@@ -62,7 +64,22 @@ public class HDFSFileReader extends FileReader {
} catch (IOException e) {
throw new LoadException("Failed to create HDFS file system", e);
}
- Path path = new Path(source.path());
+
+ String input = source.path();
+ if (input.contains("*")) {
+ int lastSlashIndex = input.lastIndexOf('/');
+ if (lastSlashIndex != -1) {
+ input_path = input.substring(0, lastSlashIndex);
+ // TODO: support multiple prefix in uri?
+ prefix = input.substring(lastSlashIndex + 1, input.length() -
1);
+ } else {
+ LOG.error("File path format error!");
+ }
+ } else {
+ input_path = input;
+ }
+
+ Path path = new Path(input_path);
checkExist(this.hdfs, path);
}
@@ -98,22 +115,26 @@ public class HDFSFileReader extends FileReader {
@Override
protected List<Readable> scanReadables() throws IOException {
- Path path = new Path(this.source().path());
+ Path path = new Path(input_path);
FileFilter filter = this.source().filter();
List<Readable> paths = new ArrayList<>();
- if (this.hdfs.isFile(path)) {
+ FileStatus status = this.hdfs.getFileStatus(path);
+
+ if (status.isFile()) {
if (!filter.reserved(path.getName())) {
throw new LoadException("Please check path name and
extensions, ensure that " +
"at least one path is available for
reading");
}
paths.add(new HDFSFile(this.hdfs, path));
} else {
- assert this.hdfs.isDirectory(path);
- FileStatus[] statuses = this.hdfs.listStatus(path);
- Path[] subPaths = FileUtil.stat2Paths(statuses);
- for (Path subPath : subPaths) {
- if (filter.reserved(subPath.getName())) {
- paths.add(new HDFSFile(this.hdfs, subPath));
+ assert status.isDirectory();
+ RemoteIterator<FileStatus> iter =
this.hdfs.listStatusIterator(path);
+ while (iter.hasNext()) {
+ FileStatus subStatus = iter.next();
+ // check file/dirname StartWith prefiex & passed filter
+ if ((prefix == null || prefix.isEmpty() ||
subStatus.getPath().getName().startsWith(prefix)) &&
+ filter.reserved(subStatus.getPath().getName())) {
+ paths.add(new HDFSFile(this.hdfs, subStatus.getPath()));
}
}
}
diff --git
a/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/HDFSLoadTest.java
b/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/HDFSLoadTest.java
index 5a51e17b..4a00c5bf 100644
---
a/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/HDFSLoadTest.java
+++
b/hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/HDFSLoadTest.java
@@ -72,6 +72,33 @@ public class HDFSLoadTest extends FileLoadTest {
Assert.assertEquals(5, vertices.size());
}
+ @Test
+ public void testHDFSWithFilePrefix() {
+ ioUtil.write("vertex_person_0.csv",
+ "name,age,city",
+ "marko,29,Beijing");
+
+ ioUtil.write("vertex_person_1.csv",
+ "name,age,city",
+ "vadas,27,Hongkong",
+ "josh,32,Beijing",
+ "peter,35,Shanghai",
+ "\"li,nary\",26,\"Wu,han\"");
+
+ String[] args = new String[]{
+ "-f", structPath("hdfs_file_with_prefix/struct.json"),
+ "-s", configPath("hdfs_file_with_prefix/schema.groovy"),
+ "-g", GRAPH,
+ "-h", SERVER,
+ "--batch-insert-threads", "2",
+ "--test-mode", "true"
+ };
+ HugeGraphLoader loader = new HugeGraphLoader(args);
+ loader.load();
+ List<Vertex> vertices = CLIENT.graph().listVertices();
+ Assert.assertEquals(5, vertices.size());
+ }
+
@Test
public void testHDFSWithCoreSitePathEmpty() {
ioUtil.write("vertex_person.csv",
diff --git
a/hugegraph-loader/src/test/resources/hdfs_file_with_prefix/core-site.xml
b/hugegraph-loader/src/test/resources/hdfs_file_with_prefix/core-site.xml
new file mode 100644
index 00000000..7d02c144
--- /dev/null
+++ b/hugegraph-loader/src/test/resources/hdfs_file_with_prefix/core-site.xml
@@ -0,0 +1,22 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with this
+ work for additional information regarding copyright ownership. The ASF
+ licenses this file to You under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ License for the specific language governing permissions and limitations
+ under the License.
+ -->
+<configuration>
+ <property>
+ <name>fs.defaultFS</name>
+ <value>hdfs://localhost:8020</value>
+ </property>
+</configuration>
diff --git
a/hugegraph-loader/src/test/resources/hdfs_file_with_prefix/schema.groovy
b/hugegraph-loader/src/test/resources/hdfs_file_with_prefix/schema.groovy
new file mode 100644
index 00000000..8571b435
--- /dev/null
+++ b/hugegraph-loader/src/test/resources/hdfs_file_with_prefix/schema.groovy
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+// Define schema
+schema.propertyKey("name").asText().ifNotExist().create();
+schema.propertyKey("age").asInt().ifNotExist().create();
+schema.propertyKey("city").asText().ifNotExist().create();
+schema.propertyKey("weight").asDouble().ifNotExist().create();
+schema.propertyKey("lang").asText().ifNotExist().create();
+schema.propertyKey("date").asText().ifNotExist().create();
+schema.propertyKey("price").asDouble().ifNotExist().create();
+schema.propertyKey("feel").asText().valueList().ifNotExist().create();
+schema.propertyKey("time").asText().valueSet().ifNotExist().create();
+
+schema.vertexLabel("person").properties("name", "age",
"city").primaryKeys("name").ifNotExist().create();
+schema.vertexLabel("software").properties("name", "lang",
"price").primaryKeys("name").ifNotExist().create();
+
+schema.edgeLabel("knows").sourceLabel("person").targetLabel("person").properties("date",
"weight").ifNotExist().create();
+schema.edgeLabel("created").sourceLabel("person").targetLabel("software").properties("date",
"weight").ifNotExist().create();
+schema.edgeLabel("use").sourceLabel("person").targetLabel("software").properties("feel",
"time").nullableKeys("feel", "time").ifNotExist().create();
diff --git
a/hugegraph-loader/src/test/resources/hdfs_file_with_prefix/struct_hdfs.json
b/hugegraph-loader/src/test/resources/hdfs_file_with_prefix/struct_hdfs.json
new file mode 100644
index 00000000..2b2d54d0
--- /dev/null
+++ b/hugegraph-loader/src/test/resources/hdfs_file_with_prefix/struct_hdfs.json
@@ -0,0 +1,19 @@
+{
+ "vertices": [
+ {
+ "label": "person",
+ "input": {
+ "type": "hdfs",
+ "path": "${store_path}/vertex_*",
+ "core_site_path":
"src/test/resources/hdfs_with_core_site_path/core-site.xml",
+ "format": "CSV",
+ "charset": "UTF-8"
+ },
+ "field_mapping": {
+ "name": "name",
+ "age": "age",
+ "city": "city"
+ }
+ }
+ ]
+}