This is an automated email from the ASF dual-hosted git repository.

vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ada2ef  [HUDI-902] Avoid exception when getSchemaProvider (#1584)
2ada2ef is described below

commit 2ada2ef50fc373ed3083d0e7a96e5e644be52bfb
Author: Raymond Xu <2701446+xushi...@users.noreply.github.com>
AuthorDate: Fri May 15 21:33:02 2020 -0700

    [HUDI-902] Avoid exception when getSchemaProvider (#1584)
    
    * When no new input data, don't throw exception for null SchemaProvider
    * Return the newly added NullSchemaProvider instead
---
 .../apache/hudi/utilities/sources/InputBatch.java  | 24 ++++++++++++--
 .../hudi/utilities/sources/TestInputBatch.java     | 37 ++++++++++++++++++++++
 2 files changed, 59 insertions(+), 2 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/InputBatch.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/InputBatch.java
index dcf56f3..f752e0d 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/InputBatch.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/InputBatch.java
@@ -18,10 +18,14 @@
 
 package org.apache.hudi.utilities.sources;
 
+import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 
+import org.apache.avro.Schema;
+import org.apache.spark.api.java.JavaSparkContext;
+
 public class InputBatch<T> {
 
   private final Option<T> batch;
@@ -49,9 +53,25 @@ public class InputBatch<T> {
   }
 
   public SchemaProvider getSchemaProvider() {
-    if (schemaProvider == null) {
+    if (batch.isPresent() && schemaProvider == null) {
       throw new HoodieException("Please provide a valid schema provider 
class!");
     }
-    return schemaProvider;
+    return Option.ofNullable(schemaProvider).orElse(new NullSchemaProvider());
+  }
+
+  public static class NullSchemaProvider extends SchemaProvider {
+
+    public NullSchemaProvider() {
+      this(null, null);
+    }
+
+    public NullSchemaProvider(TypedProperties props, JavaSparkContext jssc) {
+      super(props, jssc);
+    }
+
+    @Override
+    public Schema getSourceSchema() {
+      return Schema.create(Schema.Type.NULL);
+    }
   }
 }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestInputBatch.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestInputBatch.java
new file mode 100644
index 0000000..752621d
--- /dev/null
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestInputBatch.java
@@ -0,0 +1,37 @@
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.utilities.schema.RowBasedSchemaProvider;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestInputBatch {
+
+  @Test
+  public void getSchemaProviderShouldThrowException() {
+    final InputBatch<String> inputBatch = new InputBatch<>(Option.of("foo"), 
null, null);
+    Throwable t = assertThrows(HoodieException.class, 
inputBatch::getSchemaProvider);
+    assertEquals("Please provide a valid schema provider class!", 
t.getMessage());
+  }
+
+  @Test
+  public void getSchemaProviderShouldReturnNullSchemaProvider() {
+    final InputBatch<String> inputBatch = new InputBatch<>(Option.empty(), 
null, null);
+    SchemaProvider schemaProvider = inputBatch.getSchemaProvider();
+    assertTrue(schemaProvider instanceof InputBatch.NullSchemaProvider);
+  }
+
+  @Test
+  public void getSchemaProviderShouldReturnGivenSchemaProvider() {
+    SchemaProvider schemaProvider = new RowBasedSchemaProvider(null);
+    final InputBatch<String> inputBatch = new InputBatch<>(Option.of("foo"), 
null, schemaProvider);
+    assertSame(schemaProvider, inputBatch.getSchemaProvider());
+  }
+}

Reply via email to