[GitHub] [pulsar] codelipenghui commented on issue #4100: DeadLetterTopic is not working for FailOver subscription type
codelipenghui commented on issue #4100: DeadLetterTopic is not working for FailOver subscription type URL: https://github.com/apache/pulsar/issues/4100#issuecomment-489394769 > May i know more details on why? For `FailOver` and `Exclusive` subscription mode, pulsar do not track the ack status for each message. When message redelivery happens in `FailOver` or `Exclusive` mode, broker just rewind the cursor(Currently, no sign for ack timeout message). So we originally supported this feature in `Shared` mode. This is possible to support this feature in `FailOver` or `Exclusive` mode. Like in `Shared` mode, we need a `RedeliveryTracker` to track each ack timeout message, there are some differences between `Shared` mode, we need a ordered structure to maintain position. > Also what are the alternatives suggested, does it mean cursor will not move until the message is processed and acked ? As far as I know there is no alternative currently, If message cannot be successfully acked, cursor will not move. By the way, can I know your usage scenario? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] sijie commented on a change in pull request #4192: [pulsar-broker]Support key value schema compatibility checker
sijie commented on a change in pull request #4192: [pulsar-broker]Support key value schema compatibility checker URL: https://github.com/apache/pulsar/pull/4192#discussion_r281005177 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/KeyValueSchemaCompatibilityCheck.java ## @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.schema; + +import com.google.gson.Gson; +import org.apache.pulsar.common.schema.KeyValue; +import org.apache.pulsar.common.schema.SchemaData; +import org.apache.pulsar.common.schema.SchemaType; + +import java.nio.ByteBuffer; +import java.util.Map; + +/** + * {@link KeyValueSchemaCompatibilityCheck} for {@link SchemaType#KEY_VALUE}. + */ +public class KeyValueSchemaCompatibilityCheck implements SchemaCompatibilityCheck { + +private final Map checkers; + +public KeyValueSchemaCompatibilityCheck(Map checkers) { +this.checkers = checkers; +} + +private KeyValue splitKeyValueSchemaData(byte[] bytes) { +ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); +int keyLength = byteBuffer.getInt(); +byte[] keySchema = new byte[keyLength]; +byteBuffer.get(keySchema); + +int valueLength = byteBuffer.getInt(); +byte[] valueSchema = new byte[valueLength]; +byteBuffer.get(valueSchema); +return new KeyValue<>(keySchema, valueSchema); +} + +@Override +public SchemaType getSchemaType() { +return SchemaType.KEY_VALUE; +} + +@Override +public boolean isCompatible(SchemaData from, SchemaData to, SchemaCompatibilityStrategy strategy) { +KeyValue fromKeyValue = this.splitKeyValueSchemaData(from.getData()); +KeyValue toKeyValue = this.splitKeyValueSchemaData(to.getData()); + +SchemaType fromKeyType = SchemaType.valueOf(from.getProps().get("key.schema.type")); +SchemaType fromValueType = SchemaType.valueOf(from.getProps().get("value.schema.type")); +SchemaType toKeyType = SchemaType.valueOf(to.getProps().get("key.schema.type")); +SchemaType toValueType = SchemaType.valueOf(to.getProps().get("value.schema.type")); + +if (fromKeyType != toKeyType || fromValueType != toValueType) { +return false; +} + +Gson schemaGson = new Gson(); +Map keyFromProperties = schemaGson.fromJson(from.getProps().get("key.schema.properties"), Map.class); Review comment: don't you need to check if `from.getProps().get("key.schema.properties")` returns null or not? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] waterscar opened a new pull request #4204: Update ingress port from server to 80
waterscar opened a new pull request #4204: Update ingress port from server to 80 URL: https://github.com/apache/pulsar/pull/4204 <-- ### Contribution Checklist - Name the pull request in the form "[Issue XYZ][component] Title of the pull request", where *XYZ* should be replaced by the actual issue number. Skip *Issue XYZ* if there is no associated github issue for this pull request. Skip *component* if you are unsure about which is the best component. E.g. `[docs] Fix typo in produce method`. - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review. - Each pull request should address only one issue, not mix up code from multiple issues. - Each commit in the pull request has a meaningful commit message - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below. **(The sections below can be removed for hotfixes of typos)** --> Simply fix the typo(?) issue in the ingress port default value. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] srkukarni merged pull request #4203: Check for existance of schemaInfo before accessing it
srkukarni merged pull request #4203: Check for existance of schemaInfo before accessing it URL: https://github.com/apache/pulsar/pull/4203 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[pulsar] branch master updated: Check for existance of schemaInfo before accessing it (#4203)
This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 96bf9f6 Check for existance of schemaInfo before accessing it (#4203) 96bf9f6 is described below commit 96bf9f6156db388e2ad762ed6f471d1f8be3c8a2 Author: Sanjeev Kulkarni AuthorDate: Sat May 4 16:56:22 2019 -0700 Check for existance of schemaInfo before accessing it (#4203) --- .../src/main/java/org/apache/pulsar/client/impl/MessageImpl.java| 2 +- .../java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java index d239dc1..d11d177 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java @@ -247,7 +247,7 @@ public class MessageImpl implements Message { byte [] schemaVersion = getSchemaVersion(); if (schema.supportSchemaVersioning() && schemaVersion != null) { return schema.decode(getData(), schemaVersion); -} else if (SchemaType.KEY_VALUE == schema.getSchemaInfo().getType()) { +} else if (schema.getSchemaInfo() != null && SchemaType.KEY_VALUE == schema.getSchemaInfo().getType()) { KeyValueSchema kvSchema = (KeyValueSchema) schema; if (kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) { return schema.decode(getKeyBytes(), getData()); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java index 22f235e..e7404ab 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java @@ -99,7 +99,7 @@ public class TypedMessageBuilderImpl implements TypedMessageBuilder { public TypedMessageBuilder value(T value) { checkArgument(value != null, "Need Non-Null content value"); -if (schema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) { +if (schema.getSchemaInfo() != null && schema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) { KeyValueSchema kvSchema = (KeyValueSchema) schema; org.apache.pulsar.common.schema.KeyValue kv = (org.apache.pulsar.common.schema.KeyValue) value; if (kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) {
[GitHub] [pulsar] sijie closed issue #4042: For Java Functions API: Improve how message properties can be set for output messages and context.published messages
sijie closed issue #4042: For Java Functions API: Improve how message properties can be set for output messages and context.published messages URL: https://github.com/apache/pulsar/issues/4042 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] sijie merged pull request #4093: [issue#4042] improve java functions API
sijie merged pull request #4093: [issue#4042] improve java functions API URL: https://github.com/apache/pulsar/pull/4093 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[pulsar] branch master updated: [issue#4042] improve java functions API (#4093)
This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 65fe863 [issue#4042] improve java functions API (#4093) 65fe863 is described below commit 65fe863f84a0446f369e3b5a44f9851c1f53c23d Author: 冉小龙 AuthorDate: Sun May 5 07:49:03 2019 +0800 [issue#4042] improve java functions API (#4093) Master Issue: #4042 Fixes #4042 Motivation improve java functions API, when you need to publish the fields in the TypedMessageBuilder, there is no need to add a new publish method, just modify the interface in the TypedMessageBuilder. --- .../worker/PulsarFunctionPublishTest.java | 2 +- pulsar-functions/api-java/pom.xml | 6 + .../org/apache/pulsar/functions/api/Context.java | 31 ++- .../pulsar/functions/instance/ContextImpl.java | 228 ++--- .../pulsar/functions/instance/ContextImplTest.java | 2 +- .../functions/api/examples/PublishFunction.java| 9 +- ...geConf.java => TypedMessageBuilderPublish.java} | 19 +- .../api/examples/UserPublishFunction.java | 10 +- ...ge_conf.py => typed_message_builder_publish.py} | 2 +- .../windowing/WindowFunctionExecutor.java | 2 +- .../windowing/WindowFunctionExecutorTest.java | 8 + .../functions/PulsarFunctionsTestBase.java | 7 +- 12 files changed, 221 insertions(+), 105 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java index ef88fdd..e3b4131 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java @@ -279,7 +279,7 @@ public class PulsarFunctionPublishTest { functionConfig.setSubName(subscriptionName); functionConfig.setInputs(Collections.singleton(sourceTopic)); functionConfig.setAutoAck(true); - functionConfig.setClassName("org.apache.pulsar.functions.api.examples.PublishFunctionWithMessageConf"); + functionConfig.setClassName("org.apache.pulsar.functions.api.examples.TypedMessageBuilderPublish"); functionConfig.setRuntime(FunctionConfig.Runtime.JAVA); Map userConfig = new HashMap<>(); userConfig.put("publish-topic", publishTopic); diff --git a/pulsar-functions/api-java/pom.xml b/pulsar-functions/api-java/pom.xml index 6f0481d..33e2ea0 100644 --- a/pulsar-functions/api-java/pom.xml +++ b/pulsar-functions/api-java/pom.xml @@ -42,6 +42,12 @@ typetools test + + org.apache.pulsar + pulsar-client-api + ${project.version} + compile + diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java index 556086a..013b5f2 100644 --- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java +++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java @@ -19,6 +19,10 @@ package org.apache.pulsar.functions.api; import java.nio.ByteBuffer; + +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.TypedMessageBuilder; import org.slf4j.Logger; import java.util.Collection; @@ -232,6 +236,7 @@ public interface Context { * @param schemaOrSerdeClassName Either a builtin schema type (eg: "avro", "json", "protobuf") or the class name * of the custom schema class * @return A future that completes when the framework is done publishing the message + * @deprecated in favor of using {@link #newOutputMessage(String, Schema)} */ CompletableFuture publish(String topicName, O object, String schemaOrSerdeClassName); @@ -241,28 +246,18 @@ public interface Context { * @param topicName The name of the topic for publishing * @param objectThe object that needs to be published * @return A future that completes when the framework is done publishing the message + * @deprecated in favor of using {@link #newOutputMessage(String, Schema)} */ CompletableFuture publish(String topicName, O object); /** - * Publish an object using serDe or schema class for serializing to the topic. - * - * @param topicName The name of the topic for publishing - * @param object The object that needs to be published - * @param schemaOrSerdeClassName Either a builtin schema type (eg: "avro", "json", "protobuf") or the class name
[GitHub] [pulsar] srkukarni commented on issue #4203: Check for existance of schemaInfo before accessing it
srkukarni commented on issue #4203: Check for existance of schemaInfo before accessing it URL: https://github.com/apache/pulsar/pull/4203#issuecomment-489370387 run cpp tests This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] srkukarni opened a new pull request #4203: Check for existance of schemaInfo before accessing it
srkukarni opened a new pull request #4203: Check for existance of schemaInfo before accessing it URL: https://github.com/apache/pulsar/pull/4203 ### Motivation Sometimes SchemaInfo can be null. Check for those conditions. ### Modifications *Describe the modifications you've done.* ### Verifying this change - [ ] Make sure that the change passes the CI checks. *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Does this pull request potentially affect one of the following parts: *If `yes` was chosen, please highlight the changes* - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API: (yes / no) - The schema: (yes / no / don't know) - The default values of configurations: (yes / no) - The wire protocol: (yes / no) - The rest endpoints: (yes / no) - The admin cli options: (yes / no) - Anything that affects deployment: (yes / no / don't know) ### Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) - If a feature is not applicable for documentation, explain why? - If a feature is not documented yet in this PR, please create a followup issue for adding the documentation This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] pouledodue opened a new issue #4202: State as first-class citizen
pouledodue opened a new issue #4202: State as first-class citizen URL: https://github.com/apache/pulsar/issues/4202 Goal is to have capability equivalent to DynamoDB so I can rely less on external databases Implement more capabilities of RocksDB: Get, Put, Range, Multiget Access all those capabilities by Rest API, Python, Java State less tied to Function like asked in #2489 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] wolfstudy commented on issue #4174: [go function] support localrun and cluster mode for go function
wolfstudy commented on issue #4174: [go function] support localrun and cluster mode for go function URL: https://github.com/apache/pulsar/pull/4174#issuecomment-489333078 ping @srkukarni do you want to take a look as well? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] wolfstudy commented on issue #4093: [issue#4042] improve java functions API
wolfstudy commented on issue #4093: [issue#4042] improve java functions API URL: https://github.com/apache/pulsar/pull/4093#issuecomment-489324612 run java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [pulsar] wolfstudy commented on issue #4093: [issue#4042] improve java functions API
wolfstudy commented on issue #4093: [issue#4042] improve java functions API URL: https://github.com/apache/pulsar/pull/4093#issuecomment-489314945 run java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[pulsar] branch asf-site updated: Updated site at revision e9619fa
This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/asf-site by this push: new 2876e17 Updated site at revision e9619fa 2876e17 is described below commit 2876e17986c4d77a4af0a1907943d9f5f9d796c8 Author: jenkins AuthorDate: Sat May 4 08:35:51 2019 + Updated site at revision e9619fa --- content/docs/en/next/reference-configuration.html | 1 + .../en/next/reference-configuration/index.html | 1 + content/swagger/swagger.json | 22 +++--- content/swagger/swaggerfunctions.json | 34 +++--- 4 files changed, 30 insertions(+), 28 deletions(-) diff --git a/content/docs/en/next/reference-configuration.html b/content/docs/en/next/reference-configuration.html index 3da301b..227567b 100644 --- a/content/docs/en/next/reference-configuration.html +++ b/content/docs/en/next/reference-configuration.html @@ -237,6 +237,7 @@ managedLedgerDefaultWriteQuorumNumber of copies to store for each message2 managedLedgerDefaultAckQuorumNumber of guaranteed copies (acks to wait before write is complete)2 managedLedgerCacheSizeMBAmount of memory to use for caching data payload in managed ledger. This memory is allocated from JVM direct memory and it’s shared across all the topics running in the same broker. By default, uses 1/5th of available direct memory +managedLedgerCacheCopyEntriesWhether we should make a copy of the entry payloads when inserting in cachefalse managedLedgerCacheEvictionWatermarkThreshold to which bring down the cache level when eviction is triggered0.9 managedLedgerCacheEvictionFrequencyConfigure the cache eviction frequency for the managed ledger cache (evictions/sec)100.0 managedLedgerCacheEvictionTimeThresholdMillisAll entries that have stayed in cache for more than the configured time, will be evicted1000 diff --git a/content/docs/en/next/reference-configuration/index.html b/content/docs/en/next/reference-configuration/index.html index 3da301b..227567b 100644 --- a/content/docs/en/next/reference-configuration/index.html +++ b/content/docs/en/next/reference-configuration/index.html @@ -237,6 +237,7 @@ managedLedgerDefaultWriteQuorumNumber of copies to store for each message2 managedLedgerDefaultAckQuorumNumber of guaranteed copies (acks to wait before write is complete)2 managedLedgerCacheSizeMBAmount of memory to use for caching data payload in managed ledger. This memory is allocated from JVM direct memory and it’s shared across all the topics running in the same broker. By default, uses 1/5th of available direct memory +managedLedgerCacheCopyEntriesWhether we should make a copy of the entry payloads when inserting in cachefalse managedLedgerCacheEvictionWatermarkThreshold to which bring down the cache level when eviction is triggered0.9 managedLedgerCacheEvictionFrequencyConfigure the cache eviction frequency for the managed ledger cache (evictions/sec)100.0 managedLedgerCacheEvictionTimeThresholdMillisAll entries that have stayed in cache for more than the configured time, will be evicted1000 diff --git a/content/swagger/swagger.json b/content/swagger/swagger.json index 3a2a1d0..f9ad9ad 100644 --- a/content/swagger/swagger.json +++ b/content/swagger/swagger.json @@ -4105,7 +4105,7 @@ "200" : { "description" : "successful operation", "schema" : { - "$ref" : "#/definitions/NonPersistentTopicStats" + "$ref" : "#/definitions/TopicStats" } }, "403" : { @@ -7170,21 +7170,24 @@ "loadReportType" : { "type" : "string" }, -"memory" : { +"cpu" : { "$ref" : "#/definitions/ResourceUsage" }, "directMemory" : { "$ref" : "#/definitions/ResourceUsage" }, +"lastUpdate" : { + "type" : "integer", + "format" : "int64" +}, "bandwidthIn" : { "$ref" : "#/definitions/ResourceUsage" }, "bandwidthOut" : { "$ref" : "#/definitions/ResourceUsage" }, -"lastUpdate" : { - "type" : "integer", - "format" : "int64" +"memory" : { + "$ref" : "#/definitions/ResourceUsage" }, "msgThroughputIn" : { "type" : "number", @@ -7193,9 +7196,6 @@ "msgThroughputOut" : { "type" : "number", "format" : "double" -}, -"cpu" : { - "$ref" : "#/definitions/ResourceUsage" } } }, @@ -8180,11 +8180,11 @@ "ResourceUnit" : { "type" : "object", "properties" : { -"availableResource" : { - "$ref" : "#/definitions/ResourceDescription" -}, "resourceId" : { "type" : "string" +}, +"avai
[GitHub] [pulsar] wolfstudy commented on issue #4174: [go function] support localrun and cluster mode for go function
wolfstudy commented on issue #4174: [go function] support localrun and cluster mode for go function URL: https://github.com/apache/pulsar/pull/4174#issuecomment-489306086 run java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services