This is an automated email from the ASF dual-hosted git repository.

vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new e20b77b  HUDI-92 : Making deltastreamer with DistributedTestSource 
also run locally
e20b77b is described below

commit e20b77be3b1e14039d659cd56bec163708e1b24d
Author: Vinoth Chandar <vin...@apache.org>
AuthorDate: Fri Jul 19 04:53:28 2019 -0700

    HUDI-92 : Making deltastreamer with DistributedTestSource also run locally
    
     - Separating out the test data generators per partition
     - Minor logging improvements on IOHandle performance
---
 .gitignore                                         |  1 +
 .../com/uber/hoodie/io/HoodieAppendHandle.java     | 25 +++++++++-----
 .../com/uber/hoodie/io/HoodieCreateHandle.java     |  5 ++-
 .../java/com/uber/hoodie/io/HoodieMergeHandle.java | 22 ++++++++-----
 .../utilities/sources/AbstractBaseTestSource.java  | 38 ++++++++++++----------
 .../sources/DistributedTestDataSource.java         | 10 +++---
 .../hoodie/utilities/sources/TestDataSource.java   |  7 ++--
 7 files changed, 63 insertions(+), 45 deletions(-)

diff --git a/.gitignore b/.gitignore
index 3304d70..f4f680a 100644
--- a/.gitignore
+++ b/.gitignore
@@ -6,6 +6,7 @@ target/
 .DS_Store
  
 *.class
+.java-version
  
 # Package Files #
 *.jar
diff --git 
a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java 
b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java
index 99d3c63..64452d7 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java
@@ -27,6 +27,7 @@ import com.uber.hoodie.common.model.HoodieLogFile;
 import com.uber.hoodie.common.model.HoodieRecord;
 import com.uber.hoodie.common.model.HoodieRecordLocation;
 import com.uber.hoodie.common.model.HoodieRecordPayload;
+import com.uber.hoodie.common.model.HoodieWriteStat;
 import com.uber.hoodie.common.model.HoodieWriteStat.RuntimeStats;
 import com.uber.hoodie.common.table.TableFileSystemView.RealtimeView;
 import com.uber.hoodie.common.table.log.HoodieLogFormat;
@@ -247,17 +248,23 @@ public class HoodieAppendHandle<T extends 
HoodieRecordPayload> extends HoodieWri
       if (writer != null) {
         writer.close();
       }
-      writeStatus.getStat().setFileId(this.fileId);
-      writeStatus.getStat().setNumWrites(recordsWritten);
-      writeStatus.getStat().setNumUpdateWrites(updatedRecordsWritten);
-      writeStatus.getStat().setNumInserts(insertRecordsWritten);
-      writeStatus.getStat().setNumDeletes(recordsDeleted);
-      writeStatus.getStat().setTotalWriteBytes(estimatedNumberOfBytesWritten);
-      writeStatus.getStat().setFileSizeInBytes(sizeInBytes);
-      
writeStatus.getStat().setTotalWriteErrors(writeStatus.getTotalErrorRecords());
+
+      HoodieWriteStat stat = writeStatus.getStat();
+      stat.setFileId(this.fileId);
+      stat.setNumWrites(recordsWritten);
+      stat.setNumUpdateWrites(updatedRecordsWritten);
+      stat.setNumInserts(insertRecordsWritten);
+      stat.setNumDeletes(recordsDeleted);
+      stat.setTotalWriteBytes(estimatedNumberOfBytesWritten);
+      stat.setFileSizeInBytes(sizeInBytes);
+      stat.setTotalWriteErrors(writeStatus.getTotalErrorRecords());
       RuntimeStats runtimeStats = new RuntimeStats();
       runtimeStats.setTotalUpsertTime(timer.endTimer());
-      writeStatus.getStat().setRuntimeStats(runtimeStats);
+      stat.setRuntimeStats(runtimeStats);
+
+      logger.info(String.format("AppendHandle for partitionPath %s fileID %s, 
took %d ms.",
+          stat.getPartitionPath(), stat.getFileId(), 
runtimeStats.getTotalUpsertTime()));
+
       return writeStatus;
     } catch (IOException e) {
       throw new HoodieUpsertException("Failed to close UpdateHandle", e);
diff --git 
a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java 
b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java
index cc4596e..3830313 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java
@@ -72,7 +72,7 @@ public class HoodieCreateHandle<T extends 
HoodieRecordPayload> extends HoodieWri
       throw new HoodieInsertException(
           "Failed to initialize HoodieStorageWriter for path " + path, e);
     }
-    logger.info("New InsertHandle for partition :" + partitionPath + " with 
fileId " + fileId);
+    logger.info("New CreateHandle for partition :" + partitionPath + " with 
fileId " + fileId);
   }
 
   /**
@@ -172,6 +172,9 @@ public class HoodieCreateHandle<T extends 
HoodieRecordPayload> extends HoodieWri
       stat.setRuntimeStats(runtimeStats);
       writeStatus.setStat(stat);
 
+      logger.info(String.format("CreateHandle for partitionPath %s fileID %s, 
took %d ms.",
+          stat.getPartitionPath(), stat.getFileId(), 
runtimeStats.getTotalCreateTime()));
+
       return writeStatus;
     } catch (IOException e) {
       throw new HoodieInsertException("Failed to close the Insert Handle for 
path " + path, e);
diff --git 
a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java 
b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java
index b22bcb3..bd6650e 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java
@@ -327,16 +327,22 @@ public class HoodieMergeHandle<T extends 
HoodieRecordPayload> extends HoodieWrit
       }
 
       long fileSizeInBytes = FSUtils.getFileSize(fs, newFilePath);
-      writeStatus.getStat().setTotalWriteBytes(fileSizeInBytes);
-      writeStatus.getStat().setFileSizeInBytes(fileSizeInBytes);
-      writeStatus.getStat().setNumWrites(recordsWritten);
-      writeStatus.getStat().setNumDeletes(recordsDeleted);
-      writeStatus.getStat().setNumUpdateWrites(updatedRecordsWritten);
-      writeStatus.getStat().setNumInserts(insertRecordsWritten);
-      
writeStatus.getStat().setTotalWriteErrors(writeStatus.getTotalErrorRecords());
+      HoodieWriteStat stat = writeStatus.getStat();
+
+      stat.setTotalWriteBytes(fileSizeInBytes);
+      stat.setFileSizeInBytes(fileSizeInBytes);
+      stat.setNumWrites(recordsWritten);
+      stat.setNumDeletes(recordsDeleted);
+      stat.setNumUpdateWrites(updatedRecordsWritten);
+      stat.setNumInserts(insertRecordsWritten);
+      stat.setTotalWriteErrors(writeStatus.getTotalErrorRecords());
       RuntimeStats runtimeStats = new RuntimeStats();
       runtimeStats.setTotalUpsertTime(timer.endTimer());
-      writeStatus.getStat().setRuntimeStats(runtimeStats);
+      stat.setRuntimeStats(runtimeStats);
+
+      logger.info(String.format("MergeHandle for partitionPath %s fileID %s, 
took %d ms.",
+          stat.getPartitionPath(), stat.getFileId(), 
runtimeStats.getTotalUpsertTime()));
+
       return writeStatus;
     } catch (IOException e) {
       throw new HoodieUpsertException("Failed to close UpdateHandle", e);
diff --git 
a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/AbstractBaseTestSource.java
 
b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/AbstractBaseTestSource.java
index 22b1324..b61cef6 100644
--- 
a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/AbstractBaseTestSource.java
+++ 
b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/AbstractBaseTestSource.java
@@ -9,9 +9,8 @@ import com.uber.hoodie.utilities.schema.SchemaProvider;
 import com.uber.hoodie.utilities.sources.config.TestSourceConfig;
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Stream;
 import org.apache.avro.generic.GenericRecord;
@@ -21,34 +20,35 @@ import org.apache.spark.sql.SparkSession;
 
 public abstract class AbstractBaseTestSource extends AvroSource {
 
+  static final int DEFAULT_PARTITION_NUM = 0;
+
   // Static instance, helps with reuse across a test.
-  protected static transient HoodieTestDataGenerator dataGenerator;
+  protected static transient Map<Integer, HoodieTestDataGenerator> 
dataGeneratorMap = new HashMap<>();
 
   public static void initDataGen() {
-    dataGenerator = new 
HoodieTestDataGenerator(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS);
+    dataGeneratorMap.putIfAbsent(DEFAULT_PARTITION_NUM,
+        new 
HoodieTestDataGenerator(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS));
   }
 
-  public static void initDataGen(TypedProperties props) {
+  public static void initDataGen(TypedProperties props, int partition) {
     try {
       boolean useRocksForTestDataGenKeys = 
props.getBoolean(TestSourceConfig.USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS,
           TestSourceConfig.DEFAULT_USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS);
-      String baseStoreDir = 
props.getString(TestSourceConfig.ROCKSDB_BASE_DIR_FOR_TEST_DATAGEN_KEYS, null);
-      if (null == baseStoreDir) {
-        baseStoreDir = File.createTempFile("test_data_gen", 
".keys").getParent();
-      }
+      String baseStoreDir = 
props.getString(TestSourceConfig.ROCKSDB_BASE_DIR_FOR_TEST_DATAGEN_KEYS,
+          File.createTempFile("test_data_gen", ".keys").getParent()) + "/" + 
partition;
       log.info("useRocksForTestDataGenKeys=" + useRocksForTestDataGenKeys + ", 
BaseStoreDir=" + baseStoreDir);
-      dataGenerator = new 
HoodieTestDataGenerator(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS,
-          useRocksForTestDataGenKeys ? new RocksDBBasedMap<>(baseStoreDir) : 
new HashMap<>());
+      dataGeneratorMap.put(partition, new 
HoodieTestDataGenerator(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS,
+          useRocksForTestDataGenKeys ? new RocksDBBasedMap<>(baseStoreDir) : 
new HashMap<>()));
     } catch (IOException e) {
       throw new HoodieIOException(e.getMessage(), e);
     }
   }
 
   public static void resetDataGen() {
-    if (null != dataGenerator) {
+    for (HoodieTestDataGenerator dataGenerator : dataGeneratorMap.values()) {
       dataGenerator.close();
     }
-    dataGenerator = null;
+    dataGeneratorMap.clear();
   }
 
   protected AbstractBaseTestSource(TypedProperties props,
@@ -57,10 +57,13 @@ public abstract class AbstractBaseTestSource extends 
AvroSource {
     super(props, sparkContext, sparkSession, schemaProvider);
   }
 
-  protected static Stream<GenericRecord> fetchNextBatch(TypedProperties props, 
int sourceLimit, String commitTime) {
+  protected static Stream<GenericRecord> fetchNextBatch(TypedProperties props, 
int sourceLimit, String commitTime,
+      int partition) {
     int maxUniqueKeys = 
props.getInteger(TestSourceConfig.MAX_UNIQUE_RECORDS_PROP,
         TestSourceConfig.DEFAULT_MAX_UNIQUE_RECORDS);
 
+    HoodieTestDataGenerator dataGenerator = dataGeneratorMap.get(partition);
+
     // generate `sourceLimit` number of upserts each time.
     int numExistingKeys = dataGenerator.getNumExistingKeys();
     log.info("NumExistingKeys=" + numExistingKeys);
@@ -84,15 +87,14 @@ public abstract class AbstractBaseTestSource extends 
AvroSource {
     log.info("Before DataGen. Memory Usage=" + memoryUsage1 + ", Total 
Memory=" + Runtime.getRuntime().totalMemory()
         + ", Free Memory=" + Runtime.getRuntime().freeMemory());
 
-    List<GenericRecord> records = new ArrayList<>();
     Stream<GenericRecord> updateStream = 
dataGenerator.generateUniqueUpdatesStream(commitTime, numUpdates)
-        .map(AbstractBaseTestSource::toGenericRecord);
+        .map(hr -> AbstractBaseTestSource.toGenericRecord(hr, dataGenerator));
     Stream<GenericRecord> insertStream = 
dataGenerator.generateInsertsStream(commitTime, numInserts)
-        .map(AbstractBaseTestSource::toGenericRecord);
+        .map(hr -> AbstractBaseTestSource.toGenericRecord(hr, dataGenerator));
     return Stream.concat(updateStream, insertStream);
   }
 
-  private static GenericRecord toGenericRecord(HoodieRecord hoodieRecord) {
+  private static GenericRecord toGenericRecord(HoodieRecord hoodieRecord, 
HoodieTestDataGenerator dataGenerator) {
     try {
       Optional<IndexedRecord> recordOpt = 
hoodieRecord.getData().getInsertValue(dataGenerator.avroSchema);
       return (GenericRecord) recordOpt.get();
diff --git 
a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/DistributedTestDataSource.java
 
b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/DistributedTestDataSource.java
index 533e25e..161c623 100644
--- 
a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/DistributedTestDataSource.java
+++ 
b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/DistributedTestDataSource.java
@@ -66,14 +66,14 @@ public class DistributedTestDataSource extends 
AbstractBaseTestSource {
     newProps.setProperty(TestSourceConfig.MAX_UNIQUE_RECORDS_PROP, 
maxUniqueRecordsPerPartition);
     int perPartitionSourceLimit = Math.max(1, (int) (sourceLimit / 
numTestSourcePartitions));
     JavaRDD<GenericRecord> avroRDD = 
sparkContext.parallelize(IntStream.range(0, numTestSourcePartitions).boxed()
-        .collect(Collectors.toList()), 
numTestSourcePartitions).mapPartitions(idx -> {
+        .collect(Collectors.toList()), 
numTestSourcePartitions).mapPartitionsWithIndex((p, idx) -> {
           log.info("Initializing source with newProps=" + newProps);
-          if (null == dataGenerator) {
-            initDataGen(newProps);
+          if (!dataGeneratorMap.containsKey(p)) {
+            initDataGen(newProps, p);
           }
-          Iterator<GenericRecord> itr = fetchNextBatch(newProps, 
perPartitionSourceLimit, commitTime).iterator();
+          Iterator<GenericRecord> itr = fetchNextBatch(newProps, 
perPartitionSourceLimit, commitTime, p).iterator();
           return itr;
-        });
+        }, true);
     return new InputBatch<>(Optional.of(avroRDD), commitTime);
   }
 }
diff --git 
a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/TestDataSource.java
 
b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/TestDataSource.java
index e4bd4ff..99b95d3 100644
--- 
a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/TestDataSource.java
+++ 
b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/TestDataSource.java
@@ -40,9 +40,7 @@ public class TestDataSource extends AbstractBaseTestSource {
   public TestDataSource(TypedProperties props, JavaSparkContext sparkContext, 
SparkSession sparkSession,
       SchemaProvider schemaProvider) {
     super(props, sparkContext, sparkSession, schemaProvider);
-    if (null == dataGenerator) {
-      initDataGen(props);
-    }
+    initDataGen();
   }
 
   @Override
@@ -58,7 +56,8 @@ public class TestDataSource extends AbstractBaseTestSource {
       return new InputBatch<>(Optional.empty(), commitTime);
     }
 
-    List<GenericRecord> records = fetchNextBatch(props, (int)sourceLimit, 
commitTime).collect(Collectors.toList());
+    List<GenericRecord> records = fetchNextBatch(props, (int)sourceLimit, 
commitTime, DEFAULT_PARTITION_NUM)
+        .collect(Collectors.toList());
     JavaRDD<GenericRecord> avroRDD = 
sparkContext.<GenericRecord>parallelize(records, 4);
     return new InputBatch<>(Optional.of(avroRDD), commitTime);
   }

Reply via email to