This is an automated email from the ASF dual-hosted git repository. vbalaji pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push: new 2ada2ef [HUDI-902] Avoid exception when getSchemaProvider (#1584) 2ada2ef is described below commit 2ada2ef50fc373ed3083d0e7a96e5e644be52bfb Author: Raymond Xu <2701446+xushi...@users.noreply.github.com> AuthorDate: Fri May 15 21:33:02 2020 -0700 [HUDI-902] Avoid exception when getSchemaProvider (#1584) * When no new input data, don't throw exception for null SchemaProvider * Return the newly added NullSchemaProvider instead --- .../apache/hudi/utilities/sources/InputBatch.java | 24 ++++++++++++-- .../hudi/utilities/sources/TestInputBatch.java | 37 ++++++++++++++++++++++ 2 files changed, 59 insertions(+), 2 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/InputBatch.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/InputBatch.java index dcf56f3..f752e0d 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/InputBatch.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/InputBatch.java @@ -18,10 +18,14 @@ package org.apache.hudi.utilities.sources; +import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.utilities.schema.SchemaProvider; +import org.apache.avro.Schema; +import org.apache.spark.api.java.JavaSparkContext; + public class InputBatch<T> { private final Option<T> batch; @@ -49,9 +53,25 @@ public class InputBatch<T> { } public SchemaProvider getSchemaProvider() { - if (schemaProvider == null) { + if (batch.isPresent() && schemaProvider == null) { throw new HoodieException("Please provide a valid schema provider class!"); } - return schemaProvider; + return Option.ofNullable(schemaProvider).orElse(new NullSchemaProvider()); + } + + public static class NullSchemaProvider extends SchemaProvider { + + public NullSchemaProvider() { + this(null, null); + } + + public NullSchemaProvider(TypedProperties props, JavaSparkContext jssc) { + super(props, jssc); + } + + @Override + public Schema getSourceSchema() { + return Schema.create(Schema.Type.NULL); + } } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestInputBatch.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestInputBatch.java new file mode 100644 index 0000000..752621d --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestInputBatch.java @@ -0,0 +1,37 @@ +package org.apache.hudi.utilities.sources; + +import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.utilities.schema.RowBasedSchemaProvider; +import org.apache.hudi.utilities.schema.SchemaProvider; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TestInputBatch { + + @Test + public void getSchemaProviderShouldThrowException() { + final InputBatch<String> inputBatch = new InputBatch<>(Option.of("foo"), null, null); + Throwable t = assertThrows(HoodieException.class, inputBatch::getSchemaProvider); + assertEquals("Please provide a valid schema provider class!", t.getMessage()); + } + + @Test + public void getSchemaProviderShouldReturnNullSchemaProvider() { + final InputBatch<String> inputBatch = new InputBatch<>(Option.empty(), null, null); + SchemaProvider schemaProvider = inputBatch.getSchemaProvider(); + assertTrue(schemaProvider instanceof InputBatch.NullSchemaProvider); + } + + @Test + public void getSchemaProviderShouldReturnGivenSchemaProvider() { + SchemaProvider schemaProvider = new RowBasedSchemaProvider(null); + final InputBatch<String> inputBatch = new InputBatch<>(Option.of("foo"), null, schemaProvider); + assertSame(schemaProvider, inputBatch.getSchemaProvider()); + } +}