This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-0.5.3
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 5fbb5a76f9ded25e9b6bd15e2db40e1a7c7a0f4d
Author: Raymond Xu <2701446+xushi...@users.noreply.github.com>
AuthorDate: Fri May 15 21:33:02 2020 -0700

    [HUDI-902] Avoid exception when getSchemaProvider (#1584)
    
    * When no new input data, don't throw exception for null SchemaProvider
    * Return the newly added NullSchemaProvider instead
---
 .../apache/hudi/utilities/sources/InputBatch.java  | 24 +++++++++++--
 .../hudi/utilities/sources/TestInputBatch.java     | 40 ++++++++++++++++++++++
 2 files changed, 62 insertions(+), 2 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/InputBatch.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/InputBatch.java
index dcf56f3..1acb25b 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/InputBatch.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/InputBatch.java
@@ -19,9 +19,13 @@
 package org.apache.hudi.utilities.sources;
 
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.TypedProperties;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 
+import org.apache.avro.Schema;
+import org.apache.spark.api.java.JavaSparkContext;
+
 public class InputBatch<T> {
 
   private final Option<T> batch;
@@ -49,9 +53,25 @@ public class InputBatch<T> {
   }
 
   public SchemaProvider getSchemaProvider() {
-    if (schemaProvider == null) {
+    if (batch.isPresent() && schemaProvider == null) {
       throw new HoodieException("Please provide a valid schema provider 
class!");
     }
-    return schemaProvider;
+    return Option.ofNullable(schemaProvider).orElse(new NullSchemaProvider());
+  }
+
+  public static class NullSchemaProvider extends SchemaProvider {
+
+    public NullSchemaProvider() {
+      this(null, null);
+    }
+
+    public NullSchemaProvider(TypedProperties props, JavaSparkContext jssc) {
+      super(props, jssc);
+    }
+
+    @Override
+    public Schema getSourceSchema() {
+      return Schema.create(Schema.Type.NULL);
+    }
   }
 }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestInputBatch.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestInputBatch.java
new file mode 100644
index 0000000..314ae48
--- /dev/null
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestInputBatch.java
@@ -0,0 +1,40 @@
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.utilities.schema.RowBasedSchemaProvider;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertSame;
+
+public class TestInputBatch {
+
+  @Rule
+  public ExpectedException exceptionRule = ExpectedException.none();
+
+  @Test
+  public void getSchemaProviderShouldThrowException() {
+    final InputBatch<String> inputBatch = new InputBatch<>(Option.of("foo"), 
null, null);
+    exceptionRule.expect(HoodieException.class);
+    exceptionRule.expectMessage("Please provide a valid schema provider 
class!");
+    inputBatch.getSchemaProvider();
+  }
+
+  @Test
+  public void getSchemaProviderShouldReturnNullSchemaProvider() {
+    final InputBatch<String> inputBatch = new InputBatch<>(Option.empty(), 
null, null);
+    SchemaProvider schemaProvider = inputBatch.getSchemaProvider();
+    assertTrue(schemaProvider instanceof InputBatch.NullSchemaProvider);
+  }
+
+  @Test
+  public void getSchemaProviderShouldReturnGivenSchemaProvider() {
+    SchemaProvider schemaProvider = new RowBasedSchemaProvider(null);
+    final InputBatch<String> inputBatch = new InputBatch<>(Option.of("foo"), 
null, schemaProvider);
+    assertSame(schemaProvider, inputBatch.getSchemaProvider());
+  }
+}

Reply via email to