This is an automated email from the ASF dual-hosted git repository.

vaughn pushed a commit to branch fix_zyxxoo
in repository 
https://gitbox.apache.org/repos/asf/incubator-hugegraph-toolchain.git

commit 6e05f9dd7cf7108445a6abe569545601d83eb375
Author: vaughn.zhang <[email protected]>
AuthorDate: Mon Jan 29 17:01:32 2024 +0800

    fix: concurrency issue causing file overwrite due to identical filenames
---
 .../loader/direct/loader/HBaseDirectLoader.java    | 65 +++++++++++++++++++++-
 1 file changed, 63 insertions(+), 2 deletions(-)

diff --git 
a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loader/HBaseDirectLoader.java
 
b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loader/HBaseDirectLoader.java
index fd1a0236..3526b5cb 100644
--- 
a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loader/HBaseDirectLoader.java
+++ 
b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loader/HBaseDirectLoader.java
@@ -18,10 +18,13 @@
 package org.apache.hugegraph.loader.direct.loader;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.SecureRandom;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FsShell;
@@ -59,8 +62,66 @@ public class HBaseDirectLoader extends 
DirectLoader<ImmutableBytesWritable, KeyV
     private SinkToHBase sinkToHBase;
     private LoadDistributeMetrics loadDistributeMetrics;
 
+    private static final int RANDOM_VALUE1;
+    private static final short RANDOM_VALUE2;
+    private static final AtomicInteger NEXT_COUNTER;
+
     public static final Logger LOG = Log.logger(HBaseDirectLoader.class);
 
+    static {
+        try {
+            SecureRandom secureRandom = new SecureRandom();
+            RANDOM_VALUE1 = secureRandom.nextInt(0x01000000);
+            RANDOM_VALUE2 = (short) secureRandom.nextInt(0x00008000);
+            NEXT_COUNTER = new AtomicInteger(new SecureRandom().nextInt());
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static byte int3(final int x) {
+        return (byte) (x >> 24);
+    }
+
+    private static byte int2(final int x) {
+        return (byte) (x >> 16);
+    }
+
+    private static byte int1(final int x) {
+        return (byte) (x >> 8);
+    }
+
+    private static byte int0(final int x) {
+        return (byte) (x);
+    }
+
+    private static byte short1(final short x) {
+        return (byte) (x >> 8);
+    }
+
+    private static byte short0(final short x) {
+        return (byte) (x);
+    }
+
+    public static String fileID() {
+        long timeStamp = System.currentTimeMillis() / 1000;
+        ByteBuffer byteBuffer = ByteBuffer.allocate(12);
+        byteBuffer.put(int3((int) timeStamp));
+        byteBuffer.put(int2((int) timeStamp));
+        byteBuffer.put(int1((int) timeStamp));
+        byteBuffer.put(int0((int) timeStamp));
+        byteBuffer.put(int2(RANDOM_VALUE1));
+        byteBuffer.put(int1(RANDOM_VALUE1));
+        byteBuffer.put(int0(RANDOM_VALUE1));
+        byteBuffer.put(short1(RANDOM_VALUE2));
+        byteBuffer.put(short0(RANDOM_VALUE2));
+        byteBuffer.put(int2(NEXT_COUNTER.incrementAndGet()));
+        byteBuffer.put(int1(NEXT_COUNTER.incrementAndGet()));
+        byteBuffer.put(int0(NEXT_COUNTER.incrementAndGet()));
+
+        return Bytes.toHex(byteBuffer.array());
+    }
+
     public HBaseDirectLoader(LoadOptions loadOptions,
                              InputStruct struct,
                              LoadDistributeMetrics loadDistributeMetrics) {
@@ -144,8 +205,8 @@ public class HBaseDirectLoader extends 
DirectLoader<ImmutableBytesWritable, KeyV
 
     public String getHFilePath(Configuration conf) throws IOException {
         FileSystem fs = FileSystem.get(conf);
-        long timeStr = System.currentTimeMillis();
-        String pathStr = fs.getWorkingDirectory().toString() + "/hfile-gen" + 
"/" + timeStr + "/";
+        String fileID = fileID();
+        String pathStr = fs.getWorkingDirectory().toString() + "/hfile-gen" + 
"/" + fileID + "/";
         Path hfileGenPath = new Path(pathStr);
         if (fs.exists(hfileGenPath)) {
             LOG.info("\n Delete the path where the hfile is generated,path {} 
", pathStr);

Reply via email to