This is an automated email from the ASF dual-hosted git repository.

xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 19c977c  SAMZA-2309: Remove readFn requirement for remote tables 
(#1144)
19c977c is described below

commit 19c977ce67ea560f669c790e2f855bfc2323cbcb
Author: Daniel Chen <[email protected]>
AuthorDate: Fri Aug 23 14:29:46 2019 -0700

    SAMZA-2309: Remove readFn requirement for remote tables (#1144)
---
 .../table/descriptors/RemoteTableDescriptor.java   | 12 +++++---
 .../samza/table/remote/AsyncRemoteTable.java       | 10 +++++--
 .../org/apache/samza/table/remote/RemoteTable.java |  7 +++--
 .../samza/table/remote/TestAsyncRemoteTable.java   |  4 +--
 .../apache/samza/table/remote/TestRemoteTable.java | 15 +++++++++-
 .../descriptors/TestRemoteTableDescriptor.java     | 33 +++++++++++++++++++++-
 6 files changed, 69 insertions(+), 12 deletions(-)

diff --git 
a/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java
 
b/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java
index b0590c5..3eed914 100644
--- 
a/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java
+++ 
b/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java
@@ -77,7 +77,7 @@ public class RemoteTableDescriptor<K, V> extends 
BaseTableDescriptor<K, V, Remot
   public static final String WRITE_RETRY_POLICY = "io.write.retry.policy";
   public static final String BATCH_PROVIDER = "io.batch.provider";
 
-  // Input support for a specific remote store (required)
+  // Input support for a specific remote store (optional)
   private TableReadFunction<K, V> readFn;
 
   // Output support for a specific remote store (optional)
@@ -86,6 +86,7 @@ public class RemoteTableDescriptor<K, V> extends 
BaseTableDescriptor<K, V, Remot
   // Rate limiter for client-side throttling; it is set by withRateLimiter()
   private RateLimiter rateLimiter;
 
+  // Indicate whether read rate limiter is enabled or not
   private boolean enableReadRateLimiter = true;
 
   // Indicate whether write rate limiter is enabled or not
@@ -327,8 +328,10 @@ public class RemoteTableDescriptor<K, V> extends 
BaseTableDescriptor<K, V, Remot
     addTableConfig(ASYNC_CALLBACK_POOL_SIZE, 
String.valueOf(asyncCallbackPoolSize), tableConfig);
 
     // Handle table reader function
-    addTableConfig(READ_FN, SerdeUtils.serialize("read function", readFn), 
tableConfig);
-    addTablePartConfig(READ_FN, readFn, jobConfig, tableConfig);
+    if (readFn != null) {
+      addTableConfig(READ_FN, SerdeUtils.serialize("read function", readFn), 
tableConfig);
+      addTablePartConfig(READ_FN, readFn, jobConfig, tableConfig);
+    }
 
     // Handle table write function
     if (writeFn != null) {
@@ -345,7 +348,8 @@ public class RemoteTableDescriptor<K, V> extends 
BaseTableDescriptor<K, V, Remot
 
   @Override
   protected void validate() {
-    Preconditions.checkNotNull(readFn, "TableReadFunction is required.");
+    Preconditions.checkArgument(writeFn != null || readFn != null,
+        "Must have one of TableReadFunction or TableWriteFunction");
     Preconditions.checkArgument(rateLimiter == null || tagCreditsMap.isEmpty(),
         "Only one of rateLimiter instance or read/write limits can be 
specified");
     // Assume callback executor pool should have no more than 20 threads
diff --git 
a/samza-core/src/main/java/org/apache/samza/table/remote/AsyncRemoteTable.java 
b/samza-core/src/main/java/org/apache/samza/table/remote/AsyncRemoteTable.java
index 4b1851b..ebe4858 100644
--- 
a/samza-core/src/main/java/org/apache/samza/table/remote/AsyncRemoteTable.java
+++ 
b/samza-core/src/main/java/org/apache/samza/table/remote/AsyncRemoteTable.java
@@ -42,13 +42,15 @@ public class AsyncRemoteTable<K, V> implements 
AsyncReadWriteTable<K, V> {
   private final TableWriteFunction<K, V> writeFn;
 
   public AsyncRemoteTable(TableReadFunction<K, V> readFn, 
TableWriteFunction<K, V> writeFn) {
-    Preconditions.checkNotNull(readFn, "null readFn");
+    Preconditions.checkArgument(writeFn != null || readFn != null,
+        "Must have one of TableReadFunction or TableWriteFunction");
     this.readFn = readFn;
     this.writeFn = writeFn;
   }
 
   @Override
   public CompletableFuture<V> getAsync(K key, Object ... args) {
+    Preconditions.checkNotNull(readFn, "null readFn");
     return args.length > 0
         ? readFn.getAsync(key, args)
         : readFn.getAsync(key);
@@ -56,6 +58,7 @@ public class AsyncRemoteTable<K, V> implements 
AsyncReadWriteTable<K, V> {
 
   @Override
   public CompletableFuture<Map<K, V>> getAllAsync(List<K> keys, Object ... 
args) {
+    Preconditions.checkNotNull(readFn, "null readFn");
     return args.length > 0
         ? readFn.getAllAsync(keys, args)
         : readFn.getAllAsync(keys);
@@ -63,6 +66,7 @@ public class AsyncRemoteTable<K, V> implements 
AsyncReadWriteTable<K, V> {
 
   @Override
   public <T> CompletableFuture<T> readAsync(int opId, Object... args) {
+    Preconditions.checkNotNull(readFn, "null readFn");
     return readFn.readAsync(opId, args);
   }
 
@@ -119,7 +123,9 @@ public class AsyncRemoteTable<K, V> implements 
AsyncReadWriteTable<K, V> {
 
   @Override
   public void close() {
-    readFn.close();
+    if (readFn != null) {
+      readFn.close();
+    }
     if (writeFn != null) {
       writeFn.close();
     }
diff --git 
a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTable.java 
b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTable.java
index 85f612c..6d6c23a 100644
--- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTable.java
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTable.java
@@ -130,7 +130,8 @@ public final class RemoteTable<K, V> extends 
BaseReadWriteTable<K, V>
       ExecutorService callbackExecutor) {
 
     super(tableId);
-    Preconditions.checkNotNull(readFn, "null readFn");
+    Preconditions.checkArgument(writeFn != null || readFn != null,
+        "Must have one of TableReadFunction or TableWriteFunction");
 
     this.readFn = readFn;
     this.writeFn = writeFn;
@@ -348,7 +349,9 @@ public final class RemoteTable<K, V> extends 
BaseReadWriteTable<K, V>
   public void init(Context context) {
     super.init(context);
     asyncTable.init(context);
-    readFn.init(context, this);
+    if (readFn != null) {
+      readFn.init(context, this);
+    }
     if (writeFn != null) {
       writeFn.init(context, this);
     }
diff --git 
a/samza-core/src/test/java/org/apache/samza/table/remote/TestAsyncRemoteTable.java
 
b/samza-core/src/test/java/org/apache/samza/table/remote/TestAsyncRemoteTable.java
index d557c31..20706dc 100644
--- 
a/samza-core/src/test/java/org/apache/samza/table/remote/TestAsyncRemoteTable.java
+++ 
b/samza-core/src/test/java/org/apache/samza/table/remote/TestAsyncRemoteTable.java
@@ -176,8 +176,8 @@ public class TestAsyncRemoteTable {
     verify(writeFn, times(1)).flush();
   }
 
-  @Test(expected = NullPointerException.class)
-  public void testFailOnNullReadFn() {
+  @Test(expected = IllegalArgumentException.class)
+  public void testFailOnNullReadFnAndWriteFn() {
     new AsyncRemoteTable(null, null);
   }
 
diff --git 
a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java 
b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java
index 7a98504..718aa2c 100644
--- 
a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java
+++ 
b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java
@@ -88,7 +88,9 @@ public class TestRemoteTable {
         readRateLimiter, writeRateLimiter, rateLimitingExecutor,
         readPolicy, writePolicy, retryExecutor, null, null, cbExecutor);
     table.init(getMockContext());
-    verify(readFn, times(1)).init(any(), any());
+    if (readFn != null) {
+      verify(readFn, times(1)).init(any(), any());
+    }
     if (writeFn != null) {
       verify(writeFn, times(1)).init(any(), any());
     }
@@ -122,6 +124,17 @@ public class TestRemoteTable {
     verify(table.readRateLimiter, times(error && retry ? 2 : 
1)).throttle(anyString());
   }
 
+  @Test(expected = IllegalArgumentException.class)
+  public void testFailOnNullReadFnAndWriteFn() {
+    getTable("id", null, null, false);
+  }
+
+  @Test
+  public void testSucceedValidationOnNullReadFn() {
+    RemoteTable<String, String> table = getTable("tableId", null, 
mock(TableWriteFunction.class), false);
+    Assert.assertNotNull(table);
+  }
+
   @Test
   public void testInit() {
     String tableId = "testInit";
diff --git 
a/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
 
b/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
index ce89c5a..cf03599 100644
--- 
a/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
+++ 
b/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
@@ -100,6 +100,37 @@ public class TestRemoteTableDescriptor {
   }
 
   @Test
+  public void testValidateOnlyReadOrWriteFn() {
+    // Only read defined
+    String tableId = "1";
+    RemoteTableDescriptor desc = new RemoteTableDescriptor(tableId)
+        .withReadFunction(createMockTableReadFunction())
+        .withReadRateLimiterDisabled();
+    Map<String, String> tableConfig = desc.toConfig(new MapConfig());
+    Assert.assertNotNull(tableConfig);
+
+    // Only write defined
+    String tableId2 = "2";
+    RemoteTableDescriptor desc2 = new RemoteTableDescriptor(tableId2)
+        .withWriteFunction(createMockTableWriteFunction())
+        .withWriteRateLimiterDisabled();
+    tableConfig = desc2.toConfig(new MapConfig());
+    Assert.assertNotNull(tableConfig);
+
+    // Neither read or write defined (Failure case)
+    String tableId3 = "3";
+    RemoteTableDescriptor desc3 = new RemoteTableDescriptor(tableId3);
+    try {
+      desc3.toConfig(new MapConfig());
+      Assert.fail("Should not allow neither readFn or writeFn defined");
+    } catch (Exception e) {
+      Assert.assertTrue(e instanceof IllegalArgumentException);
+      Assert.assertTrue(e.getMessage().contains("Must have one of 
TableReadFunction or TableWriteFunction"));
+    }
+  }
+
+
+  @Test
   public void testSerializeSimple() {
     doTestSerialize(null, null, null);
   }
@@ -135,7 +166,7 @@ public class TestRemoteTableDescriptor {
     assertEquals(null, RemoteTableDescriptor.WRITE_FN, tableId, tableConfig);
   }
 
-  @Test(expected = NullPointerException.class)
+  @Test(expected = IllegalArgumentException.class)
   public void testSerializeNullReadFunction() {
     RemoteTableDescriptor desc = new RemoteTableDescriptor("1");
     Map<String, String> tableConfig = desc.toConfig(new MapConfig());

Reply via email to