This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 0851c75  Added api for putting state via cli/rest (#4311)
0851c75 is described below

commit 0851c75a5b1f3bda4fd7943bbd568888079c9383
Author: Sanjeev Kulkarni <sanjee...@gmail.com>
AuthorDate: Mon Jun 3 17:39:48 2019 -0700

    Added api for putting state via cli/rest (#4311)
    
    * Added api for putting state via cli/rest
    
    * Addressed comments
    
    * More comments
    
    * Took feedback into account
---
 .../pulsar/broker/admin/impl/FunctionsBase.java    | 20 +++++
 .../functions/worker/PulsarFunctionStateTest.java  | 13 +++
 .../org/apache/pulsar/client/admin/Functions.java  | 27 ++++++
 .../client/admin/internal/FunctionsImpl.java       | 17 ++++
 .../org/apache/pulsar/admin/cli/CmdFunctions.java  | 28 ++++++-
 .../pulsar/common/functions/FunctionState.java     |  1 +
 .../python-examples/wordcount_function.py          |  1 +
 .../functions/worker/rest/api/ComponentImpl.java   | 98 ++++++++++++++++++++--
 .../worker/rest/api/v3/FunctionsApiV3Resource.java | 11 +++
 9 files changed, 208 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index 21ecd2e..ec94d04 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -277,6 +277,26 @@ public class FunctionsBase extends AdminResource 
implements Supplier<WorkerServi
     }
 
     @POST
+    @ApiOperation(
+            value = "Put the state associated with a Pulsar Function"
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions"),
+            @ApiResponse(code = 404, message = "The function does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error")
+    })
+    @Path("/{tenant}/{namespace}/{functionName}/state/{key}")
+    @Consumes(MediaType.MULTIPART_FORM_DATA)
+    public void putFunctionState(final @PathParam("tenant") String tenant,
+                                 final @PathParam("namespace") String 
namespace,
+                                 final @PathParam("functionName") String 
functionName,
+                                 final @PathParam("key") String key,
+                                 final @FormDataParam("state") FunctionState 
stateJson) {
+        functions.putFunctionState(tenant, namespace, functionName, key, 
stateJson, clientAppId(), clientAuthData());
+    }
+
+    @POST
     @ApiOperation(value = "Restart function instance", response = Void.class)
     @ApiResponses(value = {
             @ApiResponse(code = 400, message = "Invalid request"),
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionStateTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionStateTest.java
index 70f5cf7..0b5c340 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionStateTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionStateTest.java
@@ -377,6 +377,19 @@ public class PulsarFunctionStateTest {
             Assert.assertEquals(e.getStatusCode(), 
Response.Status.NOT_FOUND.getStatusCode());
         }
 
+        FunctionState newState = new FunctionState("foobar", "foobarvalue", 
null, 0l, 0l);
+        try {
+            admin.functions().putFunctionState(tenant, namespacePortion, 
functionName + "bar", newState);
+            Assert.fail("Should have failed since function doesn't exist");
+        } catch (PulsarAdminException e) {
+            Assert.assertEquals(e.getStatusCode(), 
Response.Status.NOT_FOUND.getStatusCode());
+        }
+
+        // This succeeds because function name is correct
+        admin.functions().putFunctionState(tenant, namespacePortion, 
functionName, newState);
+        state = admin.functions().getFunctionState(tenant, namespacePortion, 
functionName, "foobar");
+        Assert.assertTrue(state.getStringValue().equals("foobarvalue"));
+
         // validate pulsar-sink consumer has consumed all messages and 
delivered to Pulsar sink but unacked messages
         // due to publish failure
         
assertNotEquals(admin.topics().getStats(sourceTopic).subscriptions.values().iterator().next().unackedMessages,
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
index a3edb8a..977d195 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
@@ -485,4 +485,31 @@ public interface Functions {
      *             Unexpected error
      */
     FunctionState getFunctionState(String tenant, String namespace, String 
function, String key) throws PulsarAdminException;
+
+    /**
+     * Puts the given state associated with a Pulsar Function.
+     * <p>
+     * Response Example:
+     *
+     * <pre>
+     * <code>{ "value : 12, version : 2"}</code>
+     * </pre>
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param function
+     *            Function name
+     * @param state
+     *            FunctionState
+     **
+     * @throws NotAuthorizedException
+     *             You don't have admin permission to get the configuration of 
the cluster
+     * @throws NotFoundException
+     *             Cluster doesn't exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void putFunctionState(String tenant, String namespace, String function, 
FunctionState state) throws PulsarAdminException;
 }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
index 2cb916d..28cfabb 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
@@ -463,6 +463,7 @@ public class FunctionsImpl extends ComponentResource 
implements Functions {
         }
     }
 
+    @Override
     public FunctionState getFunctionState(String tenant, String namespace, 
String function, String key)
         throws PulsarAdminException {
         try {
@@ -476,4 +477,20 @@ public class FunctionsImpl extends ComponentResource 
implements Functions {
             throw getApiException(e);
         }
     }
+
+    @Override
+    public void putFunctionState(String tenant, String namespace, String 
function, FunctionState state)
+            throws PulsarAdminException {
+        try {
+             RequestBuilder builder = 
post(functions.path(tenant).path(namespace).path(function).path("state").path(state.getKey()).getUri().toASCIIString());
+             builder.addBodyPart(new StringPart("state", 
ObjectMapperFactory.getThreadLocal().writeValueAsString(state), 
MediaType.APPLICATION_JSON));
+             org.asynchttpclient.Response response = 
asyncHttpClient.executeRequest(addAuthHeaders(functions, 
builder).build()).get();
+
+             if (response.getStatusCode() < 200 || response.getStatusCode() >= 
300) {
+                 throw 
getApiException(Response.status(response.getStatusCode()).entity(response.getResponseBody()).build());
+             }
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
 }
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index eab62d8..deec5ff 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -27,6 +27,7 @@ import com.beust.jcommander.Parameter;
 import com.beust.jcommander.ParameterException;
 import com.beust.jcommander.Parameters;
 import com.beust.jcommander.converters.StringConverter;
+import com.fasterxml.jackson.core.type.TypeReference;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
@@ -54,6 +55,7 @@ import org.apache.pulsar.common.functions.UpdateOptions;
 import org.apache.pulsar.common.functions.Utils;
 import org.apache.pulsar.common.functions.WindowConfig;
 import org.apache.pulsar.common.functions.FunctionState;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
 
 @Slf4j
 @Parameters(commandDescription = "Interface for managing Pulsar Functions 
(lightweight, Lambda-style compute processes that work with Pulsar)")
@@ -71,6 +73,7 @@ public class CmdFunctions extends CmdBase {
     private final StartFunction start;
     private final ListFunctions lister;
     private final StateGetter stateGetter;
+    private final StatePutter statePutter;
     private final TriggerFunction triggerer;
     private final UploadFunction uploader;
     private final DownloadFunction downloader;
@@ -799,7 +802,7 @@ public class CmdFunctions extends CmdBase {
         }
     }
 
-    @Parameters(commandDescription = "Fetch the current state associated with 
a Pulsar Function running in cluster mode")
+    @Parameters(commandDescription = "Fetch the current state associated with 
a Pulsar Function")
     class StateGetter extends FunctionCommand {
 
         @Parameter(names = { "-k", "--key" }, description = "key")
@@ -830,6 +833,22 @@ public class CmdFunctions extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "Put the state associated with a Pulsar 
Function")
+    class StatePutter extends FunctionCommand {
+
+        @Parameter(names = { "-s", "--state" }, description = "The 
FunctionState that needs to be put", required = true)
+        private String state = null;
+
+        @Override
+        void runCmd() throws Exception {
+            TypeReference<FunctionState> typeRef
+                    = new TypeReference<FunctionState>() {};
+            FunctionState stateRepr = 
ObjectMapperFactory.getThreadLocal().readValue(state, typeRef);
+            admin.functions()
+                    .putFunctionState(tenant, namespace, functionName, 
stateRepr);
+        }
+    }
+
     @Parameters(commandDescription = "Triggers the specified Pulsar Function 
with a supplied value")
     class TriggerFunction extends FunctionCommand {
         // for backward compatibility purposes
@@ -943,6 +962,7 @@ public class CmdFunctions extends CmdBase {
         functionStats = new GetFunctionStats();
         lister = new ListFunctions();
         stateGetter = new StateGetter();
+        statePutter = new StatePutter();
         triggerer = new TriggerFunction();
         uploader = new UploadFunction();
         downloader = new DownloadFunction();
@@ -962,6 +982,7 @@ public class CmdFunctions extends CmdBase {
         jcommander.addCommand("stats", getFunctionStats());
         jcommander.addCommand("list", getLister());
         jcommander.addCommand("querystate", getStateGetter());
+        jcommander.addCommand("putstate", getStatePutter());
         jcommander.addCommand("trigger", getTriggerer());
         jcommander.addCommand("upload", getUploader());
         jcommander.addCommand("download", getDownloader());
@@ -1001,6 +1022,11 @@ public class CmdFunctions extends CmdBase {
     }
 
     @VisibleForTesting
+    StatePutter getStatePutter() {
+        return statePutter;
+    }
+
+    @VisibleForTesting
     StateGetter getStateGetter() {
         return stateGetter;
     }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionState.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionState.java
index 15bcda0..5b7b0ca 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionState.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionState.java
@@ -34,6 +34,7 @@ import lombok.*;
 public class FunctionState {
     private String key;
     private String stringValue;
+    private byte[] byteValue;
     private Long numberValue;
     private Long version;
 }
diff --git a/pulsar-functions/python-examples/wordcount_function.py 
b/pulsar-functions/python-examples/wordcount_function.py
index d84eb95..22cf4d7 100644
--- a/pulsar-functions/python-examples/wordcount_function.py
+++ b/pulsar-functions/python-examples/wordcount_function.py
@@ -31,4 +31,5 @@ class WordCountFunction(Function):
         words = input.split()
         for word in words:
             context.incr_counter(word, 1)
+            context.get_logger().info("The value is " + 
str(context.get_counter(word)))
         return input + "!"
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index 2dba1cd..62ac9bd 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.functions.worker.rest.api;
 
+import com.fasterxml.jackson.core.type.TypeReference;
 import com.google.gson.Gson;
 import com.google.protobuf.ByteString;
 import io.netty.buffer.ByteBuf;
@@ -56,6 +57,7 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.FunctionStats;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.util.Codec;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.functions.auth.FunctionAuthData;
 import org.apache.pulsar.functions.instance.InstanceUtils;
 import org.apache.pulsar.functions.proto.Function;
@@ -1441,7 +1443,7 @@ public abstract class ComponentImpl {
 
         // validate parameters
         try {
-            validateGetFunctionStateParams(tenant, namespace, functionName, 
key);
+            validateFunctionStateParams(tenant, namespace, functionName, key);
         } catch (IllegalArgumentException e) {
             log.error("Invalid getFunctionState request @ /{}/{}/{}/{}",
                     tenant, namespace, functionName, key, e);
@@ -1470,9 +1472,13 @@ public abstract class ComponentImpl {
                     throw new RestException(Status.NOT_FOUND, "key '" + key + 
"' doesn't exist.");
                 } else {
                     if (kv.isNumber()) {
-                        value = new FunctionState(key, null, kv.numberValue(), 
kv.version());
+                        value = new FunctionState(key, null, null, 
kv.numberValue(), kv.version());
                     } else {
-                        value = new FunctionState(key, new 
String(ByteBufUtil.getBytes(kv.value()), UTF_8), null, kv.version());
+                        try {
+                            value = new FunctionState(key, new 
String(ByteBufUtil.getBytes(kv.value(), kv.value().readerIndex(), 
kv.value().readableBytes()), UTF_8), null, null, kv.version());
+                        } catch (Exception e) {
+                            value = new FunctionState(key, null, 
ByteBufUtil.getBytes(kv.value()), null, kv.version());
+                        }
                     }
                 }
             }
@@ -1490,6 +1496,84 @@ public abstract class ComponentImpl {
         return value;
     }
 
+    public void putFunctionState(final String tenant,
+                                 final String namespace,
+                                 final String functionName,
+                                 final String key,
+                                 final FunctionState state,
+                                 final String clientRole,
+                                 final AuthenticationDataSource 
clientAuthenticationDataHttps) {
+
+        if (!isWorkerServiceAvailable()) {
+            throwUnavailableException();
+        }
+
+        if (null == worker().getStateStoreAdminClient()) {
+            throwStateStoreUnvailableResponse();
+        }
+
+        try {
+            if (!isAuthorizedRole(tenant, namespace, clientRole, 
clientAuthenticationDataHttps)) {
+                log.error("{}/{}/{} Client [{}] is not admin and authorized to 
put state for {}", tenant, namespace,
+                        functionName, clientRole, 
ComponentTypeUtils.toString(componentType));
+                throw new RestException(Status.UNAUTHORIZED, "client is not 
authorize to perform operation");
+            }
+        } catch (PulsarAdminException e) {
+            log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, 
functionName, e);
+            throw new RestException(Status.INTERNAL_SERVER_ERROR, 
e.getMessage());
+        }
+        
+        if (!key.equals(state.getKey())) {
+            log.error("{}/{}/{} Bad putFunction Request, path key doesn't 
match key in json", tenant, namespace, functionName);
+            throw new RestException(Status.BAD_REQUEST, "Path key doesn't 
match key in json");
+        }
+        if (state.getStringValue() == null && state.getByteValue() == null) {
+            throw new RestException(Status.BAD_REQUEST, "Setting Counter 
values not supported in put state");
+        }
+
+        // validate parameters
+        try {
+            validateFunctionStateParams(tenant, namespace, functionName, key);
+        } catch (IllegalArgumentException e) {
+            log.error("Invalid putFunctionState request @ /{}/{}/{}/{}",
+                    tenant, namespace, functionName, key, e);
+            throw new RestException(Status.BAD_REQUEST, e.getMessage());
+        }
+
+        String tableNs = getStateNamespace(tenant, namespace);
+        String tableName = functionName;
+
+        String stateStorageServiceUrl = 
worker().getWorkerConfig().getStateStorageServiceUrl();
+
+        if (storageClient.get() == null) {
+            storageClient.compareAndSet(null, StorageClientBuilder.newBuilder()
+                    .withSettings(StorageClientSettings.newBuilder()
+                            .serviceUri(stateStorageServiceUrl)
+                            .clientName("functions-admin")
+                            .build())
+                    .withNamespace(tableNs)
+                    .build());
+        }
+
+        ByteBuf value;
+        if (!isEmpty(state.getStringValue())) {
+            value = Unpooled.wrappedBuffer(state.getStringValue().getBytes());
+        } else {
+            value = Unpooled.wrappedBuffer(state.getByteValue());
+        }
+        try (Table<ByteBuf, ByteBuf> table = 
result(storageClient.get().openTable(tableName))) {
+            result(table.put(Unpooled.wrappedBuffer(key.getBytes(UTF_8)), 
value));
+        } catch 
(org.apache.bookkeeper.clients.exceptions.NamespaceNotFoundException | 
org.apache.bookkeeper.clients.exceptions.StreamNotFoundException e) {
+            log.error("Error while putFunctionState request @ /{}/{}/{}/{}",
+                    tenant, namespace, functionName, key, e);
+            throw new RestException(Status.NOT_FOUND, e.getMessage());
+        } catch (Exception e) {
+            log.error("Error while putFunctionState request @ /{}/{}/{}/{}",
+                    tenant, namespace, functionName, key, e);
+            throw new RestException(Status.INTERNAL_SERVER_ERROR, 
e.getMessage());
+        }
+    }
+
     public void uploadFunction(final InputStream uploadedInputStream, final 
String path) {
         // validate parameters
         try {
@@ -1586,10 +1670,10 @@ public abstract class ComponentImpl {
         }
     }
 
-    private void validateGetFunctionStateParams(final String tenant,
-                                                final String namespace,
-                                                final String functionName,
-                                                final String key)
+    private void validateFunctionStateParams(final String tenant,
+                                             final String namespace,
+                                             final String functionName,
+                                             final String key)
             throws IllegalArgumentException {
 
         if (tenant == null) {
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java
index 38f4c4b..2f1a87f 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java
@@ -324,4 +324,15 @@ public class FunctionsApiV3Resource extends 
FunctionApiResource {
                                           final @PathParam("key") String key) 
throws IOException {
         return functions.getFunctionState(tenant, namespace, functionName, 
key, clientAppId(), clientAuthData());
     }
+
+    @POST
+    @Path("/{tenant}/{namespace}/{functionName}/state/{key}")
+    @Consumes(MediaType.MULTIPART_FORM_DATA)
+    public void putFunctionState(final @PathParam("tenant") String tenant,
+                                 final @PathParam("namespace") String 
namespace,
+                                 final @PathParam("functionName") String 
functionName,
+                                 final @PathParam("key") String key,
+                                 final @FormDataParam("state") FunctionState 
stateJson) throws IOException {
+        functions.putFunctionState(tenant, namespace, functionName, key, 
stateJson, clientAppId(), clientAuthData());
+    }
 }

Reply via email to