This is an automated email from the ASF dual-hosted git repository. vbalaji pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push: new e20b77b HUDI-92 : Making deltastreamer with DistributedTestSource also run locally e20b77b is described below commit e20b77be3b1e14039d659cd56bec163708e1b24d Author: Vinoth Chandar <vin...@apache.org> AuthorDate: Fri Jul 19 04:53:28 2019 -0700 HUDI-92 : Making deltastreamer with DistributedTestSource also run locally - Separating out the test data generators per partition - Minor logging improvements on IOHandle performance --- .gitignore | 1 + .../com/uber/hoodie/io/HoodieAppendHandle.java | 25 +++++++++----- .../com/uber/hoodie/io/HoodieCreateHandle.java | 5 ++- .../java/com/uber/hoodie/io/HoodieMergeHandle.java | 22 ++++++++----- .../utilities/sources/AbstractBaseTestSource.java | 38 ++++++++++++---------- .../sources/DistributedTestDataSource.java | 10 +++--- .../hoodie/utilities/sources/TestDataSource.java | 7 ++-- 7 files changed, 63 insertions(+), 45 deletions(-) diff --git a/.gitignore b/.gitignore index 3304d70..f4f680a 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,7 @@ target/ .DS_Store *.class +.java-version # Package Files # *.jar diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java index 99d3c63..64452d7 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java @@ -27,6 +27,7 @@ import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecordLocation; import com.uber.hoodie.common.model.HoodieRecordPayload; +import com.uber.hoodie.common.model.HoodieWriteStat; import com.uber.hoodie.common.model.HoodieWriteStat.RuntimeStats; import com.uber.hoodie.common.table.TableFileSystemView.RealtimeView; import com.uber.hoodie.common.table.log.HoodieLogFormat; @@ -247,17 +248,23 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieWri if (writer != null) { writer.close(); } - writeStatus.getStat().setFileId(this.fileId); - writeStatus.getStat().setNumWrites(recordsWritten); - writeStatus.getStat().setNumUpdateWrites(updatedRecordsWritten); - writeStatus.getStat().setNumInserts(insertRecordsWritten); - writeStatus.getStat().setNumDeletes(recordsDeleted); - writeStatus.getStat().setTotalWriteBytes(estimatedNumberOfBytesWritten); - writeStatus.getStat().setFileSizeInBytes(sizeInBytes); - writeStatus.getStat().setTotalWriteErrors(writeStatus.getTotalErrorRecords()); + + HoodieWriteStat stat = writeStatus.getStat(); + stat.setFileId(this.fileId); + stat.setNumWrites(recordsWritten); + stat.setNumUpdateWrites(updatedRecordsWritten); + stat.setNumInserts(insertRecordsWritten); + stat.setNumDeletes(recordsDeleted); + stat.setTotalWriteBytes(estimatedNumberOfBytesWritten); + stat.setFileSizeInBytes(sizeInBytes); + stat.setTotalWriteErrors(writeStatus.getTotalErrorRecords()); RuntimeStats runtimeStats = new RuntimeStats(); runtimeStats.setTotalUpsertTime(timer.endTimer()); - writeStatus.getStat().setRuntimeStats(runtimeStats); + stat.setRuntimeStats(runtimeStats); + + logger.info(String.format("AppendHandle for partitionPath %s fileID %s, took %d ms.", + stat.getPartitionPath(), stat.getFileId(), runtimeStats.getTotalUpsertTime())); + return writeStatus; } catch (IOException e) { throw new HoodieUpsertException("Failed to close UpdateHandle", e); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java index cc4596e..3830313 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java @@ -72,7 +72,7 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieWri throw new HoodieInsertException( "Failed to initialize HoodieStorageWriter for path " + path, e); } - logger.info("New InsertHandle for partition :" + partitionPath + " with fileId " + fileId); + logger.info("New CreateHandle for partition :" + partitionPath + " with fileId " + fileId); } /** @@ -172,6 +172,9 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieWri stat.setRuntimeStats(runtimeStats); writeStatus.setStat(stat); + logger.info(String.format("CreateHandle for partitionPath %s fileID %s, took %d ms.", + stat.getPartitionPath(), stat.getFileId(), runtimeStats.getTotalCreateTime())); + return writeStatus; } catch (IOException e) { throw new HoodieInsertException("Failed to close the Insert Handle for path " + path, e); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java index b22bcb3..bd6650e 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java @@ -327,16 +327,22 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit } long fileSizeInBytes = FSUtils.getFileSize(fs, newFilePath); - writeStatus.getStat().setTotalWriteBytes(fileSizeInBytes); - writeStatus.getStat().setFileSizeInBytes(fileSizeInBytes); - writeStatus.getStat().setNumWrites(recordsWritten); - writeStatus.getStat().setNumDeletes(recordsDeleted); - writeStatus.getStat().setNumUpdateWrites(updatedRecordsWritten); - writeStatus.getStat().setNumInserts(insertRecordsWritten); - writeStatus.getStat().setTotalWriteErrors(writeStatus.getTotalErrorRecords()); + HoodieWriteStat stat = writeStatus.getStat(); + + stat.setTotalWriteBytes(fileSizeInBytes); + stat.setFileSizeInBytes(fileSizeInBytes); + stat.setNumWrites(recordsWritten); + stat.setNumDeletes(recordsDeleted); + stat.setNumUpdateWrites(updatedRecordsWritten); + stat.setNumInserts(insertRecordsWritten); + stat.setTotalWriteErrors(writeStatus.getTotalErrorRecords()); RuntimeStats runtimeStats = new RuntimeStats(); runtimeStats.setTotalUpsertTime(timer.endTimer()); - writeStatus.getStat().setRuntimeStats(runtimeStats); + stat.setRuntimeStats(runtimeStats); + + logger.info(String.format("MergeHandle for partitionPath %s fileID %s, took %d ms.", + stat.getPartitionPath(), stat.getFileId(), runtimeStats.getTotalUpsertTime())); + return writeStatus; } catch (IOException e) { throw new HoodieUpsertException("Failed to close UpdateHandle", e); diff --git a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/AbstractBaseTestSource.java b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/AbstractBaseTestSource.java index 22b1324..b61cef6 100644 --- a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/AbstractBaseTestSource.java +++ b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/AbstractBaseTestSource.java @@ -9,9 +9,8 @@ import com.uber.hoodie.utilities.schema.SchemaProvider; import com.uber.hoodie.utilities.sources.config.TestSourceConfig; import java.io.File; import java.io.IOException; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.stream.Stream; import org.apache.avro.generic.GenericRecord; @@ -21,34 +20,35 @@ import org.apache.spark.sql.SparkSession; public abstract class AbstractBaseTestSource extends AvroSource { + static final int DEFAULT_PARTITION_NUM = 0; + // Static instance, helps with reuse across a test. - protected static transient HoodieTestDataGenerator dataGenerator; + protected static transient Map<Integer, HoodieTestDataGenerator> dataGeneratorMap = new HashMap<>(); public static void initDataGen() { - dataGenerator = new HoodieTestDataGenerator(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS); + dataGeneratorMap.putIfAbsent(DEFAULT_PARTITION_NUM, + new HoodieTestDataGenerator(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS)); } - public static void initDataGen(TypedProperties props) { + public static void initDataGen(TypedProperties props, int partition) { try { boolean useRocksForTestDataGenKeys = props.getBoolean(TestSourceConfig.USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS, TestSourceConfig.DEFAULT_USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS); - String baseStoreDir = props.getString(TestSourceConfig.ROCKSDB_BASE_DIR_FOR_TEST_DATAGEN_KEYS, null); - if (null == baseStoreDir) { - baseStoreDir = File.createTempFile("test_data_gen", ".keys").getParent(); - } + String baseStoreDir = props.getString(TestSourceConfig.ROCKSDB_BASE_DIR_FOR_TEST_DATAGEN_KEYS, + File.createTempFile("test_data_gen", ".keys").getParent()) + "/" + partition; log.info("useRocksForTestDataGenKeys=" + useRocksForTestDataGenKeys + ", BaseStoreDir=" + baseStoreDir); - dataGenerator = new HoodieTestDataGenerator(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, - useRocksForTestDataGenKeys ? new RocksDBBasedMap<>(baseStoreDir) : new HashMap<>()); + dataGeneratorMap.put(partition, new HoodieTestDataGenerator(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, + useRocksForTestDataGenKeys ? new RocksDBBasedMap<>(baseStoreDir) : new HashMap<>())); } catch (IOException e) { throw new HoodieIOException(e.getMessage(), e); } } public static void resetDataGen() { - if (null != dataGenerator) { + for (HoodieTestDataGenerator dataGenerator : dataGeneratorMap.values()) { dataGenerator.close(); } - dataGenerator = null; + dataGeneratorMap.clear(); } protected AbstractBaseTestSource(TypedProperties props, @@ -57,10 +57,13 @@ public abstract class AbstractBaseTestSource extends AvroSource { super(props, sparkContext, sparkSession, schemaProvider); } - protected static Stream<GenericRecord> fetchNextBatch(TypedProperties props, int sourceLimit, String commitTime) { + protected static Stream<GenericRecord> fetchNextBatch(TypedProperties props, int sourceLimit, String commitTime, + int partition) { int maxUniqueKeys = props.getInteger(TestSourceConfig.MAX_UNIQUE_RECORDS_PROP, TestSourceConfig.DEFAULT_MAX_UNIQUE_RECORDS); + HoodieTestDataGenerator dataGenerator = dataGeneratorMap.get(partition); + // generate `sourceLimit` number of upserts each time. int numExistingKeys = dataGenerator.getNumExistingKeys(); log.info("NumExistingKeys=" + numExistingKeys); @@ -84,15 +87,14 @@ public abstract class AbstractBaseTestSource extends AvroSource { log.info("Before DataGen. Memory Usage=" + memoryUsage1 + ", Total Memory=" + Runtime.getRuntime().totalMemory() + ", Free Memory=" + Runtime.getRuntime().freeMemory()); - List<GenericRecord> records = new ArrayList<>(); Stream<GenericRecord> updateStream = dataGenerator.generateUniqueUpdatesStream(commitTime, numUpdates) - .map(AbstractBaseTestSource::toGenericRecord); + .map(hr -> AbstractBaseTestSource.toGenericRecord(hr, dataGenerator)); Stream<GenericRecord> insertStream = dataGenerator.generateInsertsStream(commitTime, numInserts) - .map(AbstractBaseTestSource::toGenericRecord); + .map(hr -> AbstractBaseTestSource.toGenericRecord(hr, dataGenerator)); return Stream.concat(updateStream, insertStream); } - private static GenericRecord toGenericRecord(HoodieRecord hoodieRecord) { + private static GenericRecord toGenericRecord(HoodieRecord hoodieRecord, HoodieTestDataGenerator dataGenerator) { try { Optional<IndexedRecord> recordOpt = hoodieRecord.getData().getInsertValue(dataGenerator.avroSchema); return (GenericRecord) recordOpt.get(); diff --git a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/DistributedTestDataSource.java b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/DistributedTestDataSource.java index 533e25e..161c623 100644 --- a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/DistributedTestDataSource.java +++ b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/DistributedTestDataSource.java @@ -66,14 +66,14 @@ public class DistributedTestDataSource extends AbstractBaseTestSource { newProps.setProperty(TestSourceConfig.MAX_UNIQUE_RECORDS_PROP, maxUniqueRecordsPerPartition); int perPartitionSourceLimit = Math.max(1, (int) (sourceLimit / numTestSourcePartitions)); JavaRDD<GenericRecord> avroRDD = sparkContext.parallelize(IntStream.range(0, numTestSourcePartitions).boxed() - .collect(Collectors.toList()), numTestSourcePartitions).mapPartitions(idx -> { + .collect(Collectors.toList()), numTestSourcePartitions).mapPartitionsWithIndex((p, idx) -> { log.info("Initializing source with newProps=" + newProps); - if (null == dataGenerator) { - initDataGen(newProps); + if (!dataGeneratorMap.containsKey(p)) { + initDataGen(newProps, p); } - Iterator<GenericRecord> itr = fetchNextBatch(newProps, perPartitionSourceLimit, commitTime).iterator(); + Iterator<GenericRecord> itr = fetchNextBatch(newProps, perPartitionSourceLimit, commitTime, p).iterator(); return itr; - }); + }, true); return new InputBatch<>(Optional.of(avroRDD), commitTime); } } diff --git a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/TestDataSource.java b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/TestDataSource.java index e4bd4ff..99b95d3 100644 --- a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/TestDataSource.java +++ b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/TestDataSource.java @@ -40,9 +40,7 @@ public class TestDataSource extends AbstractBaseTestSource { public TestDataSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, SchemaProvider schemaProvider) { super(props, sparkContext, sparkSession, schemaProvider); - if (null == dataGenerator) { - initDataGen(props); - } + initDataGen(); } @Override @@ -58,7 +56,8 @@ public class TestDataSource extends AbstractBaseTestSource { return new InputBatch<>(Optional.empty(), commitTime); } - List<GenericRecord> records = fetchNextBatch(props, (int)sourceLimit, commitTime).collect(Collectors.toList()); + List<GenericRecord> records = fetchNextBatch(props, (int)sourceLimit, commitTime, DEFAULT_PARTITION_NUM) + .collect(Collectors.toList()); JavaRDD<GenericRecord> avroRDD = sparkContext.<GenericRecord>parallelize(records, 4); return new InputBatch<>(Optional.of(avroRDD), commitTime); }