This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-connectors.git
The following commit(s) were added to refs/heads/main by this push:
new 7003333a4 javadsl couchbase3: replace Scala function params with Java
functions (#1528)
7003333a4 is described below
commit 7003333a4e2498bac1788947a393d266ed45cb27
Author: PJ Fanning <[email protected]>
AuthorDate: Fri Mar 27 10:20:35 2026 +0100
javadsl couchbase3: replace Scala function params with Java functions
(#1528)
* Initial plan
* Fix javadsl CouchbaseFlow and CouchbaseSink to use Java functions instead
of Scala functions
Co-authored-by: pjfanning <[email protected]>
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-connectors/sessions/33ed6416-d555-48ae-92e0-939238e581ef
* Update CouchbaseFlow.scala
* compile issues
* try to remove implicits
* Update javadsl.backwards.excludes
* more changes
* more changes
* Update javadsl.backwards.excludes
---------
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
Co-authored-by: pjfanning <[email protected]>
---
.../javadsl.backwards.excludes | 38 +-
.../couchbase3/javadsl/CouchbaseFlow.scala | 400 ++++++++++++++++++---
.../couchbase3/javadsl/CouchbaseSink.scala | 162 +++++++--
3 files changed, 512 insertions(+), 88 deletions(-)
diff --git
a/couchbase3/src/main/mima-filters/2.0.x.backward.excludes/javadsl.backwards.excludes
b/couchbase3/src/main/mima-filters/2.0.x.backward.excludes/javadsl.backwards.excludes
index c34b73646..9b976e29b 100644
---
a/couchbase3/src/main/mima-filters/2.0.x.backward.excludes/javadsl.backwards.excludes
+++
b/couchbase3/src/main/mima-filters/2.0.x.backward.excludes/javadsl.backwards.excludes
@@ -15,12 +15,44 @@
# specific language governing permissions and limitations
# under the License.
-# changed methods to return CompletionStage instead of Scala Future
+# changed Java DSL significantly because it was used Scala Futures (changed to
CompletionStage),
+# implicit params in separate param lists (now just one param list with no
implicits) and defaulted parameters
+# have been replaced with method overloads
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseFlow.append$*")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseFlow.exists$*")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseFlow.get$*")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseFlow.getAllReplicas$*")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseFlow.getJson$*")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseFlow.increment$*")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseFlow.insertDoc$*")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseFlow.prepend$*")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseFlow.replaceDoc$*")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseFlow.upsertDoc$*")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseFlow.getAllReplicasJson$*")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseFlow.getAllReplicasObject$*")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseFlow.getAllReplicasType$*")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseFlow.getObject$*")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseFlow.getType$*")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseFlow.mutateIn$*")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseFlow.mutateInDoc$*")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseFlow.touch$*")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseFlow.exists")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseSink.exists")
ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseSink.exists")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseSink.insert")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseSink.insert")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseSink.insertDoc")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseSink.remove")
ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseSink.remove")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseSink.replace")
ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseSink.replace")
ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseSink.replaceDoc")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseSink.upsert")
ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseSink.upsert")
ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseSink.upsertDoc")
-ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseSink.insert")
-ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseSink.insertDoc")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseSink.replaceDoc$*")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseSink.upsertDoc$*")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseSink.exists$*")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseSink.remove$*")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseSink.replace$*")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.couchbase3.javadsl.CouchbaseSink.upsert$*")
diff --git
a/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/javadsl/CouchbaseFlow.scala
b/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/javadsl/CouchbaseFlow.scala
index 49fb20014..f771af8a3 100644
---
a/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/javadsl/CouchbaseFlow.scala
+++
b/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/javadsl/CouchbaseFlow.scala
@@ -27,6 +27,7 @@ import org.apache.pekko.stream.javadsl.Flow
import org.apache.pekko.stream.connectors.couchbase3.scaladsl.{ CouchbaseFlow
=> ScalaCouchbaseFlow }
import java.time.{ Duration, Instant }
+import java.util.function.{ Function => JFunction }
object CouchbaseFlow {
@@ -34,69 +35,145 @@ object CouchbaseFlow {
* get a document by id from Couchbase collection
* @param options reference to Couchbase options doc
*/
- def get(options: GetOptions = GetOptions.getOptions)(
- implicit asyncCollection: AsyncCollection): Flow[String, GetResult,
NotUsed] =
- ScalaCouchbaseFlow.get(options).asJava
+ def get(options: GetOptions,
+ asyncCollection: AsyncCollection): Flow[String, GetResult, NotUsed] =
+ ScalaCouchbaseFlow.get(options)(asyncCollection).asJava
+
+ /**
+ * get a document by id from Couchbase collection
+ * @param options reference to Couchbase options doc
+ */
+ def get(asyncCollection: AsyncCollection): Flow[String, GetResult, NotUsed] =
+ ScalaCouchbaseFlow.get(GetOptions.getOptions)(asyncCollection).asJava
+
+ /**
+ * reference to [[CouchbaseFlow.get]] deserialize to Couchbase JsonObject
+ */
+ def getJson(options: GetOptions,
+ asyncCollection: AsyncCollection): Flow[String, JsonObject, NotUsed] =
+ ScalaCouchbaseFlow.getJson(options)(asyncCollection).asJava
/**
* reference to [[CouchbaseFlow.get]] deserialize to Couchbase JsonObject
*/
- def getJson(options: GetOptions = GetOptions.getOptions)(
- implicit asyncCollection: AsyncCollection): Flow[String, JsonObject,
NotUsed] =
- ScalaCouchbaseFlow.getJson(options).asJava
+ def getJson(asyncCollection: AsyncCollection): Flow[String, JsonObject,
NotUsed] =
+ ScalaCouchbaseFlow.getJson(GetOptions.getOptions)(asyncCollection).asJava
+
+ /**
+ * reference to [[CouchbaseFlow.get]],deserialize to class
+ * If you add DefaultScalaModule to jackson of couchbase, it could
deserialize to scala class
+ */
+ def getObject[T](target: Class[T], options: GetOptions,
+ asyncCollection: AsyncCollection): Flow[String, T, NotUsed] =
+ ScalaCouchbaseFlow.getObject[T](target, options)(asyncCollection).asJava
/**
* reference to [[CouchbaseFlow.get]],deserialize to class
* If you add DefaultScalaModule to jackson of couchbase, it could
deserialize to scala class
*/
- def getObject[T](target: Class[T], options: GetOptions =
GetOptions.getOptions)(
- implicit asyncCollection: AsyncCollection): Flow[String, T, NotUsed] =
- ScalaCouchbaseFlow.getObject[T](target, options).asJava
+ def getObject[T](target: Class[T], asyncCollection: AsyncCollection):
Flow[String, T, NotUsed] =
+ ScalaCouchbaseFlow.getObject[T](target,
GetOptions.getOptions)(asyncCollection).asJava
+
+ /**
+ * reference to [[CouchbaseSource.getObject]],deserialize to class with
Generics
+ */
+ def getType[T](target: TypeRef[T], options: GetOptions,
+ asyncCollection: AsyncCollection): Flow[String, T, NotUsed] =
+ ScalaCouchbaseFlow.getType[T](target, options)(asyncCollection).asJava
/**
* reference to [[CouchbaseSource.getObject]],deserialize to class with
Generics
*/
- def getType[T](target: TypeRef[T], options: GetOptions =
GetOptions.getOptions)(
- implicit asyncCollection: AsyncCollection): Flow[String, T, NotUsed] =
- ScalaCouchbaseFlow.getType[T](target, options).asJava
+ def getType[T](target: TypeRef[T], asyncCollection: AsyncCollection):
Flow[String, T, NotUsed] =
+ ScalaCouchbaseFlow.getType[T](target,
GetOptions.getOptions)(asyncCollection).asJava
/**
* similar to [[CouchbaseFlow.get]], but reads from all replicas on the
active node
* @see [[CouchbaseFlow#get]]
*/
- def getAllReplicas(options: GetAllReplicasOptions =
GetAllReplicasOptions.getAllReplicasOptions)(
- implicit asyncCollection: AsyncCollection): Flow[String,
GetReplicaResult, NotUsed] =
- ScalaCouchbaseFlow.getAllReplicas(options).asJava
+ def getAllReplicas(options: GetAllReplicasOptions,
+ asyncCollection: AsyncCollection): Flow[String, GetReplicaResult,
NotUsed] =
+ ScalaCouchbaseFlow.getAllReplicas(options)(asyncCollection).asJava
+
+ /**
+ * similar to [[CouchbaseFlow.get]], but reads from all replicas on the
active node
+ * @see [[CouchbaseFlow#get]]
+ */
+ def getAllReplicas(asyncCollection: AsyncCollection): Flow[String,
GetReplicaResult, NotUsed] =
+
ScalaCouchbaseFlow.getAllReplicas(GetAllReplicasOptions.getAllReplicasOptions)(asyncCollection).asJava
/**
* reference to [[CouchbaseFlow.getAllReplicas]], deserialize to Couchbase
JsonObject
*/
- def getAllReplicasJson(options: GetAllReplicasOptions =
GetAllReplicasOptions.getAllReplicasOptions)(
- implicit asyncCollection: AsyncCollection): Flow[String, JsonObject,
NotUsed] =
- ScalaCouchbaseFlow.getAllReplicasJson(options).asJava
+ def getAllReplicasJson(options: GetAllReplicasOptions,
+ asyncCollection: AsyncCollection): Flow[String, JsonObject, NotUsed] =
+ ScalaCouchbaseFlow.getAllReplicasJson(options)(asyncCollection).asJava
+
+ /**
+ * reference to [[CouchbaseFlow.getAllReplicas]], deserialize to Couchbase
JsonObject
+ */
+ def getAllReplicasJson(asyncCollection: AsyncCollection): Flow[String,
JsonObject, NotUsed] =
+
ScalaCouchbaseFlow.getAllReplicasJson(GetAllReplicasOptions.getAllReplicasOptions)(asyncCollection).asJava
+
+ /**
+ * reference to [[CouchbaseFlow.getAllReplicas]], deserialize to class
+ * If you add DefaultScalaModule to jackson of couchbase, it could
deserialize to scala class
+ */
+ def getAllReplicasObject[T](target: Class[T],
+ getOptions: GetAllReplicasOptions,
+ asyncCollection: AsyncCollection): Flow[String, T, NotUsed] =
+ ScalaCouchbaseFlow.getAllReplicasObject[T](target,
getOptions)(asyncCollection).asJava
/**
* reference to [[CouchbaseFlow.getAllReplicas]], deserialize to class
* If you add DefaultScalaModule to jackson of couchbase, it could
deserialize to scala class
*/
def getAllReplicasObject[T](target: Class[T],
- getOptions: GetAllReplicasOptions =
GetAllReplicasOptions.getAllReplicasOptions)(
- implicit asyncCollection: AsyncCollection): Flow[String, T, NotUsed] =
- ScalaCouchbaseFlow.getAllReplicasObject[T](target, getOptions).asJava
+ asyncCollection: AsyncCollection): Flow[String, T, NotUsed] =
+ ScalaCouchbaseFlow.getAllReplicasObject[T](target,
GetAllReplicasOptions.getAllReplicasOptions)(
+ asyncCollection).asJava
/**
* reference to [[CouchbaseFlow.getAllReplicasObject]], deserialize to class
with Generics
*/
def getAllReplicasType[T](target: TypeRef[T],
- getOptions: GetAllReplicasOptions =
GetAllReplicasOptions.getAllReplicasOptions)(
- implicit asyncCollection: AsyncCollection): Flow[String, T, NotUsed] =
- ScalaCouchbaseFlow.getAllReplicasType(target, getOptions).asJava
+ getOptions: GetAllReplicasOptions,
+ asyncCollection: AsyncCollection): Flow[String, T, NotUsed] =
+ ScalaCouchbaseFlow.getAllReplicasType(target,
getOptions)(asyncCollection).asJava
+
+ /**
+ * reference to [[CouchbaseFlow.getAllReplicasObject]], deserialize to class
with Generics
+ */
+ def getAllReplicasType[T](target: TypeRef[T],
+ asyncCollection: AsyncCollection): Flow[String, T, NotUsed] =
+ ScalaCouchbaseFlow.getAllReplicasType(target,
GetAllReplicasOptions.getAllReplicasOptions)(asyncCollection).asJava
/**
* Inserts a full document which does not exist yet with custom options.
* @param applyId parse id function, which is the document id
* @see [[com.couchbase.client.java.AsyncCollection#insert]]
*/
+ def insert[T](applyId: JFunction[T, String],
+ insertOptions: InsertOptions,
+ asyncCollection: AsyncCollection): Flow[T, T, NotUsed] =
+ ScalaCouchbaseFlow.insert[T](applyId.apply,
insertOptions)(asyncCollection).asJava
+
+ /**
+ * Inserts a full document which does not exist yet with custom options.
+ * @param applyId parse id function, which is the document id
+ * @see [[com.couchbase.client.java.AsyncCollection#insert]]
+ */
+ def insert[T](applyId: JFunction[T, String],
+ asyncCollection: AsyncCollection): Flow[T, T, NotUsed] =
+ ScalaCouchbaseFlow.insert[T](applyId.apply)(asyncCollection).asJava
+
+ /**
+ * Inserts a full document which does not exist yet with custom options.
+ * @param applyId parse id function, which is the document id
+ * @see [[com.couchbase.client.java.AsyncCollection#insert]]
+ * @deprecated Use the overloaded method that takes a
java.util.function.Function instead (since 2.0.0)
+ */
+ @deprecated("Use the overloaded method that takes a
java.util.function.Function instead", since = "2.0.0")
def insert[T](applyId: T => String,
insertOptions: InsertOptions = InsertOptions.insertOptions())(
implicit asyncCollection: AsyncCollection): Flow[T, T, NotUsed] =
@@ -106,15 +183,43 @@ object CouchbaseFlow {
* reference to [[CouchbaseFlow.insert]] <br>
* use MutationDocument to wrapper id, document and result(MutationResult)
*/
- def insertDoc[T](insertOptions: InsertOptions =
InsertOptions.insertOptions())(
- implicit asyncCollection: AsyncCollection): Flow[MutationDocument[T],
MutationDocument[T], NotUsed] =
- ScalaCouchbaseFlow.insertDoc[T](insertOptions).asJava
+ def insertDoc[T](insertOptions: InsertOptions,
+ asyncCollection: AsyncCollection): Flow[MutationDocument[T],
MutationDocument[T], NotUsed] =
+ ScalaCouchbaseFlow.insertDoc[T](insertOptions)(asyncCollection).asJava
+
+ /**
+ * reference to [[CouchbaseFlow.insert]] <br>
+ * use MutationDocument to wrapper id, document and result(MutationResult)
+ */
+ def insertDoc[T](asyncCollection: AsyncCollection):
Flow[MutationDocument[T], MutationDocument[T], NotUsed] =
+
ScalaCouchbaseFlow.insertDoc[T](InsertOptions.insertOptions())(asyncCollection).asJava
/**
* Replaces a full document which already exists with custom options.
* @param applyId parse id function, which is the document id
* @see [[com.couchbase.client.java.AsyncCollection#replace]]
*/
+ def replace[T](applyId: JFunction[T, String],
+ replaceOptions: ReplaceOptions,
+ asyncCollection: AsyncCollection): Flow[T, T, NotUsed] =
+ ScalaCouchbaseFlow.replace[T](applyId.apply,
replaceOptions)(asyncCollection).asJava
+
+ /**
+ * Replaces a full document which already exists with custom options.
+ * @param applyId parse id function, which is the document id
+ * @see [[com.couchbase.client.java.AsyncCollection#replace]]
+ */
+ def replace[T](applyId: JFunction[T, String],
+ asyncCollection: AsyncCollection): Flow[T, T, NotUsed] =
+ ScalaCouchbaseFlow.replace[T](applyId.apply)(asyncCollection).asJava
+
+ /**
+ * Replaces a full document which already exists with custom options.
+ * @param applyId parse id function, which is the document id
+ * @see [[com.couchbase.client.java.AsyncCollection#replace]]
+ * @deprecated Use the overloaded method that takes a
java.util.function.Function instead (since 2.0.0)
+ */
+ @deprecated("Use the overloaded method that takes a
java.util.function.Function instead", since = "2.0.0")
def replace[T](applyId: T => String,
replaceOptions: ReplaceOptions = ReplaceOptions.replaceOptions())(
implicit asyncCollection: AsyncCollection): Flow[T, T, NotUsed] =
@@ -124,15 +229,43 @@ object CouchbaseFlow {
* reference to [[CouchbaseFlow.replace]]
* use MutationDocument to wrapper id, document and result(MutationResult)
*/
- def replaceDoc[T](replaceOptions: ReplaceOptions =
ReplaceOptions.replaceOptions())(
- implicit asyncCollection: AsyncCollection): Flow[MutationDocument[T],
MutationDocument[T], NotUsed] =
- ScalaCouchbaseFlow.replaceDoc[T](replaceOptions).asJava
+ def replaceDoc[T](replaceOptions: ReplaceOptions,
+ asyncCollection: AsyncCollection): Flow[MutationDocument[T],
MutationDocument[T], NotUsed] =
+ ScalaCouchbaseFlow.replaceDoc[T](replaceOptions)(asyncCollection).asJava
+
+ /**
+ * reference to [[CouchbaseFlow.replace]]
+ * use MutationDocument to wrapper id, document and result(MutationResult)
+ */
+ def replaceDoc[T](asyncCollection: AsyncCollection):
Flow[MutationDocument[T], MutationDocument[T], NotUsed] =
+
ScalaCouchbaseFlow.replaceDoc[T](ReplaceOptions.replaceOptions())(asyncCollection).asJava
+
+ /**
+ * Upsert a full document which might or might not exist yet with custom
options.
+ * @param applyId parse id function, which is the document id
+ * @see [[com.couchbase.client.java.AsyncCollection#upsert]]
+ */
+ def upsert[T](applyId: JFunction[T, String],
+ upsertOptions: UpsertOptions,
+ asyncCollection: AsyncCollection): Flow[T, T, NotUsed] =
+ ScalaCouchbaseFlow.upsert[T](applyId.apply,
upsertOptions)(asyncCollection).asJava
/**
* Upsert a full document which might or might not exist yet with custom
options.
* @param applyId parse id function, which is the document id
* @see [[com.couchbase.client.java.AsyncCollection#upsert]]
*/
+ def upsert[T](applyId: JFunction[T, String],
+ asyncCollection: AsyncCollection): Flow[T, T, NotUsed] =
+ ScalaCouchbaseFlow.upsert[T](applyId.apply)(asyncCollection).asJava
+
+ /**
+ * Upsert a full document which might or might not exist yet with custom
options.
+ * @param applyId parse id function, which is the document id
+ * @see [[com.couchbase.client.java.AsyncCollection#upsert]]
+ * @deprecated Use the overloaded method that takes a
java.util.function.Function instead (since 2.0.0)
+ */
+ @deprecated("Use the overloaded method that takes a
java.util.function.Function instead", since = "2.0.0")
def upsert[T](applyId: T => String,
upsertOptions: UpsertOptions = UpsertOptions.upsertOptions())(
implicit asyncCollection: AsyncCollection): Flow[T, T, NotUsed] =
@@ -143,15 +276,46 @@ object CouchbaseFlow {
* use MutationDocument to wrapper id, document and result(MutationResult)
*/
def upsertDoc[T](
- upsertOptions: UpsertOptions = UpsertOptions.upsertOptions())(
- implicit asyncCollection: AsyncCollection): Flow[MutationDocument[T],
MutationDocument[T], NotUsed] =
- ScalaCouchbaseFlow.upsertDoc[T](upsertOptions).asJava
+ upsertOptions: UpsertOptions,
+ asyncCollection: AsyncCollection): Flow[MutationDocument[T],
MutationDocument[T], NotUsed] =
+ ScalaCouchbaseFlow.upsertDoc[T](upsertOptions)(asyncCollection).asJava
+
+ /**
+ * reference to [[CouchbaseFlow.upsert]]
+ * use MutationDocument to wrapper id, document and result(MutationResult)
+ */
+ def upsertDoc[T](
+ asyncCollection: AsyncCollection): Flow[MutationDocument[T],
MutationDocument[T], NotUsed] =
+
ScalaCouchbaseFlow.upsertDoc[T](UpsertOptions.upsertOptions())(asyncCollection).asJava
+
+ /**
+ * Removes a Document from a collection with custom options.
+ * @param applyId parse id function, which is the document id, id streams
can use `remove[String](e => e)`
+ * @see [[com.couchbase.client.java.AsyncCollection#remove]]
+ */
+ def remove[T](
+ applyId: JFunction[T, String],
+ removeOptions: RemoveOptions,
+ asyncCollection: AsyncCollection): Flow[T, T, NotUsed] =
+ ScalaCouchbaseFlow.remove[T](applyId.apply,
removeOptions)(asyncCollection).asJava
+
+ /**
+ * Removes a Document from a collection with custom options.
+ * @param applyId parse id function, which is the document id, id streams
can use `remove[String](e => e)`
+ * @see [[com.couchbase.client.java.AsyncCollection#remove]]
+ */
+ def remove[T](
+ applyId: JFunction[T, String],
+ asyncCollection: AsyncCollection): Flow[T, T, NotUsed] =
+ ScalaCouchbaseFlow.remove[T](applyId.apply)(asyncCollection).asJava
/**
* Removes a Document from a collection with custom options.
* @param applyId parse id function, which is the document id, id streams
can use `remove[String](e => e)`
* @see [[com.couchbase.client.java.AsyncCollection#remove]]
+ * @deprecated Use the overloaded method that takes a
java.util.function.Function instead (since 2.0.0)
*/
+ @deprecated("Use the overloaded method that takes a
java.util.function.Function instead", since = "2.0.0")
def remove[T](
applyId: T => String,
removeOptions: RemoveOptions = RemoveOptions.removeOptions())(
@@ -162,9 +326,17 @@ object CouchbaseFlow {
* Performs mutations to document fragments with custom options.
* @see [[com.couchbase.client.java.AsyncCollection#mutateIn]]
*/
- def mutateIn(specs: java.util.List[MutateInSpec], options: MutateInOptions =
MutateInOptions.mutateInOptions())(
- implicit asyncCollection: AsyncCollection): Flow[String, MutateInResult,
NotUsed] =
- ScalaCouchbaseFlow.mutateIn(specs, options).asJava
+ def mutateIn(specs: java.util.List[MutateInSpec], options: MutateInOptions,
+ asyncCollection: AsyncCollection): Flow[String, MutateInResult, NotUsed]
=
+ ScalaCouchbaseFlow.mutateIn(specs, options)(asyncCollection).asJava
+
+ /**
+ * Performs mutations to document fragments with custom options.
+ * @see [[com.couchbase.client.java.AsyncCollection#mutateIn]]
+ */
+ def mutateIn(specs: java.util.List[MutateInSpec],
+ asyncCollection: AsyncCollection): Flow[String, MutateInResult, NotUsed]
=
+ ScalaCouchbaseFlow.mutateIn(specs,
MutateInOptions.mutateInOptions())(asyncCollection).asJava
/**
* reference to [[CouchbaseFlow.mutateIn]]
@@ -173,9 +345,19 @@ object CouchbaseFlow {
*/
def mutateInDoc[T](
specs: java.util.List[MutateInSpec],
- options: MutateInOptions = MutateInOptions.mutateInOptions())(
- implicit asyncCollection: AsyncCollection): Flow[MutationDocument[T],
MutationDocument[T], NotUsed] =
- ScalaCouchbaseFlow.mutateInDoc[T](specs, options).asJava
+ options: MutateInOptions,
+ asyncCollection: AsyncCollection): Flow[MutationDocument[T],
MutationDocument[T], NotUsed] =
+ ScalaCouchbaseFlow.mutateInDoc[T](specs, options)(asyncCollection).asJava
+
+ /**
+ * reference to [[CouchbaseFlow.mutateIn]]
+ * use MutationDocument to wrapper id, document and result(MutationResult)
+ * @return
+ */
+ def mutateInDoc[T](
+ specs: java.util.List[MutateInSpec],
+ asyncCollection: AsyncCollection): Flow[MutationDocument[T],
MutationDocument[T], NotUsed] =
+ ScalaCouchbaseFlow.mutateInDoc[T](specs,
MutateInOptions.mutateInOptions())(asyncCollection).asJava
/**
* Checks if the given document ID exists on the active partition with
custom options.
@@ -183,23 +365,76 @@ object CouchbaseFlow {
* @see [[com.couchbase.client.java.AsyncCollection#exists]]
*/
def exists[T](
- applyId: T => String,
- existsOptions: ExistsOptions = ExistsOptions.existsOptions())(
+ applyId: JFunction[T, String],
+ existsOptions: ExistsOptions,
+ asyncCollection: AsyncCollection): Flow[T, Boolean, NotUsed] =
+ ScalaCouchbaseFlow.exists[T](applyId.apply,
existsOptions)(asyncCollection).asJava
+
+ /**
+ * Checks if the given document ID exists on the active partition with
custom options.
+ * @param applyId parse id function, which is the document id, id streams
can use `exists[String](e => e)`
+ * @see [[com.couchbase.client.java.AsyncCollection#exists]]
+ */
+ def exists[T](
+ applyId: JFunction[T, String],
+ asyncCollection: AsyncCollection): Flow[T, Boolean, NotUsed] =
+ ScalaCouchbaseFlow.exists[T](applyId.apply)(asyncCollection).asJava
+
+ /**
+ * Checks if the given document ID exists on the active partition with
custom options.
+ * @param applyId parse id function, which is the document id, id streams
can use `exists[String](e => e)`
+ * @see [[com.couchbase.client.java.AsyncCollection#exists]]
+ * @deprecated Use the overloaded method that takes a
java.util.function.Function instead (since 2.0.0)
+ */
+ @deprecated("Use the overloaded method that takes a
java.util.function.Function instead", since = "2.0.0")
+ def exists[T](
+ applyId: T => String)(
implicit asyncCollection: AsyncCollection): Flow[T, Boolean, NotUsed] =
- ScalaCouchbaseFlow.exists[T](applyId, existsOptions).asJava
+ ScalaCouchbaseFlow.exists[T](applyId).asJava
/**
* Updates the expiry of the document with the given id with custom options.
* @see [[com.couchbase.client.java.AsyncCollection#touch]]
*/
- def touch(expiry: Duration, options: TouchOptions =
TouchOptions.touchOptions())(
- implicit asyncCollection: AsyncCollection): Flow[String, MutationResult,
NotUsed] =
- ScalaCouchbaseFlow.touch(expiry, options).asJava
+ def touch(expiry: Duration, options: TouchOptions,
+ asyncCollection: AsyncCollection): Flow[String, MutationResult, NotUsed]
=
+ ScalaCouchbaseFlow.touch(expiry, options)(asyncCollection).asJava
+
+ /**
+ * Updates the expiry of the document with the given id with custom options.
+ * @see [[com.couchbase.client.java.AsyncCollection#touch]]
+ */
+ def touch(expiry: Duration,
+ asyncCollection: AsyncCollection): Flow[String, MutationResult, NotUsed]
=
+ ScalaCouchbaseFlow.touch(expiry,
TouchOptions.touchOptions())(asyncCollection).asJava
/**
* Updates the expiry of the document with the given id with custom options.
* @param applyId parse id function, which is the document id
*/
+ def touchDuration[T](
+ applyId: JFunction[T, String],
+ expiry: Duration,
+ touchOptions: TouchOptions,
+ asyncCollection: AsyncCollection): Flow[T, T, NotUsed] =
+ ScalaCouchbaseFlow.touchDuration[T](applyId.apply, expiry,
touchOptions)(asyncCollection).asJava
+
+ /**
+ * Updates the expiry of the document with the given id with custom options.
+ * @param applyId parse id function, which is the document id
+ */
+ def touchDuration[T](
+ applyId: JFunction[T, String],
+ expiry: Duration,
+ asyncCollection: AsyncCollection): Flow[T, T, NotUsed] =
+ ScalaCouchbaseFlow.touchDuration[T](applyId.apply,
expiry)(asyncCollection).asJava
+
+ /**
+ * Updates the expiry of the document with the given id with custom options.
+ * @param applyId parse id function, which is the document id
+ * @deprecated Use the overloaded method that takes a
java.util.function.Function instead (since 2.0.0)
+ */
+ @deprecated("Use the overloaded method that takes a
java.util.function.Function instead", since = "2.0.0")
def touchDuration[T](
applyId: T => String,
expiry: Duration,
@@ -211,6 +446,29 @@ object CouchbaseFlow {
* Updates the expiry of the document with the given id with custom options.
* @see [[com.couchbase.client.java.AsyncCollection#touch]]
*/
+ def touchInstant[T](
+ applyId: JFunction[T, String],
+ expiry: Instant,
+ touchOptions: TouchOptions,
+ asyncCollection: AsyncCollection): Flow[T, T, NotUsed] =
+ ScalaCouchbaseFlow.touchInstant[T](applyId.apply, expiry,
touchOptions)(asyncCollection).asJava
+
+ /**
+ * Updates the expiry of the document with the given id with custom options.
+ * @see [[com.couchbase.client.java.AsyncCollection#touch]]
+ */
+ def touchInstant[T](
+ applyId: JFunction[T, String],
+ expiry: Instant,
+ asyncCollection: AsyncCollection): Flow[T, T, NotUsed] =
+ ScalaCouchbaseFlow.touchInstant[T](applyId.apply,
expiry)(asyncCollection).asJava
+
+ /**
+ * Updates the expiry of the document with the given id with custom options.
+ * @see [[com.couchbase.client.java.AsyncCollection#touch]]
+ * @deprecated Use the overloaded method that takes a
java.util.function.Function instead (since 2.0.0)
+ */
+ @deprecated("Use the overloaded method that takes a
java.util.function.Function instead", since = "2.0.0")
def touchInstant[T](
applyId: T => String,
expiry: Instant,
@@ -222,32 +480,60 @@ object CouchbaseFlow {
* Appends binary content to the document with custom options.
* @see [[com.couchbase.client.java.AsyncBinaryCollection#append]]
*/
- def append(options: AppendOptions = AppendOptions.appendOptions())(
- implicit asyncCollection: AsyncCollection): Flow[MutationBinaryDocument,
MutationResult, NotUsed] =
- ScalaCouchbaseFlow.append(options).asJava
+ def append(options: AppendOptions,
+ asyncCollection: AsyncCollection): Flow[MutationBinaryDocument,
MutationResult, NotUsed] =
+ ScalaCouchbaseFlow.append(options)(asyncCollection).asJava
+
+ /**
+ * Appends binary content to the document with custom options.
+ * @see [[com.couchbase.client.java.AsyncBinaryCollection#append]]
+ */
+ def append(asyncCollection: AsyncCollection): Flow[MutationBinaryDocument,
MutationResult, NotUsed] =
+
ScalaCouchbaseFlow.append(AppendOptions.appendOptions())(asyncCollection).asJava
/**
* Prepends binary content to the document with custom options.
* @see [[com.couchbase.client.java.AsyncBinaryCollection#prepend]]
*/
- def prepend(options: PrependOptions = PrependOptions.prependOptions())(
- implicit asyncCollection: AsyncCollection): Flow[MutationBinaryDocument,
MutationResult, NotUsed] =
- ScalaCouchbaseFlow.prepend(options).asJava
+ def prepend(options: PrependOptions,
+ asyncCollection: AsyncCollection): Flow[MutationBinaryDocument,
MutationResult, NotUsed] =
+ ScalaCouchbaseFlow.prepend(options)(asyncCollection).asJava
+
+ /**
+ * Prepends binary content to the document with custom options.
+ * @see [[com.couchbase.client.java.AsyncBinaryCollection#prepend]]
+ */
+ def prepend(asyncCollection: AsyncCollection): Flow[MutationBinaryDocument,
MutationResult, NotUsed] =
+
ScalaCouchbaseFlow.prepend(PrependOptions.prependOptions())(asyncCollection).asJava
+
+ /**
+ * Increments the counter document by one or the number defined in the
options.
+ * @see [[com.couchbase.client.java.AsyncBinaryCollection#increment]]
+ */
+ def increment(options: IncrementOptions,
+ asyncCollection: AsyncCollection): Flow[String, CounterResult, NotUsed] =
+ ScalaCouchbaseFlow.increment(options)(asyncCollection).asJava
/**
* Increments the counter document by one or the number defined in the
options.
* @see [[com.couchbase.client.java.AsyncBinaryCollection#increment]]
*/
- def increment(options: IncrementOptions =
IncrementOptions.incrementOptions())(
- implicit asyncCollection: AsyncCollection): Flow[String, CounterResult,
NotUsed] =
- ScalaCouchbaseFlow.increment(options).asJava
+ def increment(asyncCollection: AsyncCollection): Flow[String, CounterResult,
NotUsed] =
+
ScalaCouchbaseFlow.increment(IncrementOptions.incrementOptions())(asyncCollection).asJava
+
+ /**
+ * Decrements the counter document by one or the number defined in the
options.
+ * @see [[com.couchbase.client.java.AsyncBinaryCollection#decrement]]
+ */
+ def decrement(options: DecrementOptions,
+ asyncCollection: AsyncCollection): Flow[String, CounterResult, NotUsed] =
+ ScalaCouchbaseFlow.decrement(options)(asyncCollection).asJava
/**
* Decrements the counter document by one or the number defined in the
options.
* @see [[com.couchbase.client.java.AsyncBinaryCollection#decrement]]
*/
- def decrement(options: DecrementOptions)(
- implicit asyncCollection: AsyncCollection): Flow[String, CounterResult,
NotUsed] =
- ScalaCouchbaseFlow.decrement(options).asJava
+ def decrement(asyncCollection: AsyncCollection): Flow[String, CounterResult,
NotUsed] =
+
ScalaCouchbaseFlow.decrement(DecrementOptions.decrementOptions())(asyncCollection).asJava
}
diff --git
a/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/javadsl/CouchbaseSink.scala
b/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/javadsl/CouchbaseSink.scala
index 9cc24ff9b..9da63df22 100644
---
a/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/javadsl/CouchbaseSink.scala
+++
b/couchbase3/src/main/scala/org/apache/pekko/stream/connectors/couchbase3/javadsl/CouchbaseSink.scala
@@ -25,6 +25,7 @@ import
org.apache.pekko.stream.connectors.couchbase3.scaladsl.{ CouchbaseSink =>
import org.apache.pekko.stream.javadsl.Sink
import java.util.concurrent.CompletionStage
+import java.util.function.{ Function => JFunction }
import scala.concurrent.Future
import scala.jdk.FutureConverters._
@@ -38,9 +39,22 @@ object CouchbaseSink {
* </p>
* @see {@link #insertDocFuture} which works like this method worked in 1.x.
*/
- def insertDoc[T](insertOptions: InsertOptions)(
- implicit asyncCollection: AsyncCollection): Sink[MutationDocument[T],
CompletionStage[Done]] =
-
ScalaCouchbaseSink.insertDoc[T](insertOptions).mapMaterializedValue(_.asJava).asJava
+ def insertDoc[T](insertOptions: InsertOptions,
+ asyncCollection: AsyncCollection): Sink[MutationDocument[T],
CompletionStage[Done]] =
+
ScalaCouchbaseSink.insertDoc[T](insertOptions)(asyncCollection).mapMaterializedValue(_.asJava).asJava
+
+ /**
+ * reference to [[CouchbaseFlow.insertDoc]]
+ * <p>
+ * This function's return type changed in 2.0.0 to return a Sink with a
CompletionStage instead of a
+ * Scala Future, to be more consistent with Java usage.
+ * </p>
+ * @see {@link #insertDocFuture} which works like this method worked in 1.x.
+ */
+ def insertDoc[T](
+ asyncCollection: AsyncCollection): Sink[MutationDocument[T],
CompletionStage[Done]] =
+
ScalaCouchbaseSink.insertDoc[T](InsertOptions.insertOptions())(asyncCollection).mapMaterializedValue(
+ _.asJava).asJava
/**
* reference to [[CouchbaseFlow.insertDoc]]
@@ -59,10 +73,23 @@ object CouchbaseSink {
* </p>
* @see {@link #insertFuture} which works like this method worked in 1.x.
*/
- def insert[T](applyId: T => String,
- insertOptions: InsertOptions)(
- implicit asyncCollection: AsyncCollection): Sink[T,
CompletionStage[Done]] =
- ScalaCouchbaseSink.insert[T](applyId,
insertOptions).mapMaterializedValue(_.asJava).asJava
+ def insert[T](applyId: JFunction[T, String],
+ insertOptions: InsertOptions,
+ asyncCollection: AsyncCollection): Sink[T, CompletionStage[Done]] =
+ ScalaCouchbaseSink.insert[T](applyId.apply,
insertOptions)(asyncCollection).mapMaterializedValue(_.asJava).asJava
+
+ /**
+ * reference to [[CouchbaseFlow.insert]]
+ * <p>
+ * This function's return type changed in 2.0.0 to return a Sink with a
CompletionStage instead of a
+ * Scala Future, to be more consistent with Java usage.
+ * </p>
+ * @see {@link #insertFuture} which works like this method worked in 1.x.
+ */
+ def insert[T](applyId: JFunction[T, String],
+ asyncCollection: AsyncCollection): Sink[T, CompletionStage[Done]] =
+ ScalaCouchbaseSink.insert[T](applyId.apply,
InsertOptions.insertOptions())(asyncCollection).mapMaterializedValue(
+ _.asJava).asJava
/**
* reference to [[CouchbaseFlow.insert]]
@@ -82,9 +109,22 @@ object CouchbaseSink {
* </p>
* @see {@link #upsertDocFuture} which works like this method worked in 1.x.
*/
- def upsertDoc[T](upsertOptions: UpsertOptions =
UpsertOptions.upsertOptions())(
- implicit asyncCollection: AsyncCollection): Sink[MutationDocument[T],
CompletionStage[Done]] =
-
ScalaCouchbaseSink.upsertDoc[T](upsertOptions).mapMaterializedValue(_.asJava).asJava
+ def upsertDoc[T](upsertOptions: UpsertOptions,
+ asyncCollection: AsyncCollection): Sink[MutationDocument[T],
CompletionStage[Done]] =
+
ScalaCouchbaseSink.upsertDoc[T](upsertOptions)(asyncCollection).mapMaterializedValue(_.asJava).asJava
+
+ /**
+ * reference to [[CouchbaseFlow.upsertDoc]]
+ * <p>
+ * This function's return type changed in 2.0.0 to return a Sink with a
CompletionStage instead of a
+ * Scala Future, to be more consistent with Java usage.
+ * </p>
+ * @see {@link #upsertDocFuture} which works like this method worked in 1.x.
+ */
+ def upsertDoc[T](
+ asyncCollection: AsyncCollection): Sink[MutationDocument[T],
CompletionStage[Done]] =
+
ScalaCouchbaseSink.upsertDoc[T](UpsertOptions.upsertOptions())(asyncCollection).mapMaterializedValue(
+ _.asJava).asJava
/**
* reference to [[CouchbaseFlow.upsertDoc]]
@@ -103,10 +143,23 @@ object CouchbaseSink {
* </p>
* @see {@link #upsertFuture} which works like this method worked in 1.x.
*/
- def upsert[T](applyId: T => String,
- upsertOptions: UpsertOptions = UpsertOptions.upsertOptions())(
- implicit asyncCollection: AsyncCollection): Sink[T,
CompletionStage[Done]] =
- ScalaCouchbaseSink.upsert[T](applyId,
upsertOptions).mapMaterializedValue(_.asJava).asJava
+ def upsert[T](applyId: JFunction[T, String],
+ upsertOptions: UpsertOptions,
+ asyncCollection: AsyncCollection): Sink[T, CompletionStage[Done]] =
+ ScalaCouchbaseSink.upsert[T](applyId.apply,
upsertOptions)(asyncCollection).mapMaterializedValue(_.asJava).asJava
+
+ /**
+ * reference to [[CouchbaseFlow.upsert]]
+ * <p>
+ * This function's return type changed in 2.0.0 to return a Sink with a
CompletionStage instead of a
+ * Scala Future, to be more consistent with Java usage.
+ * </p>
+ * @see {@link #upsertFuture} which works like this method worked in 1.x.
+ */
+ def upsert[T](applyId: JFunction[T, String],
+ asyncCollection: AsyncCollection): Sink[T, CompletionStage[Done]] =
+ ScalaCouchbaseSink.upsert[T](applyId.apply,
UpsertOptions.upsertOptions())(asyncCollection).mapMaterializedValue(
+ _.asJava).asJava
/**
* reference to [[CouchbaseFlow.upsert]]
@@ -127,9 +180,22 @@ object CouchbaseSink {
* @see {@link #replaceDocFuture} which works like this method worked in 1.x.
*/
def replaceDoc[T](
- replaceOptions: ReplaceOptions = ReplaceOptions.replaceOptions())(
- implicit asyncCollection: AsyncCollection): Sink[MutationDocument[T],
CompletionStage[Done]] =
-
ScalaCouchbaseSink.replaceDoc[T](replaceOptions).mapMaterializedValue(_.asJava).asJava
+ replaceOptions: ReplaceOptions,
+ asyncCollection: AsyncCollection): Sink[MutationDocument[T],
CompletionStage[Done]] =
+
ScalaCouchbaseSink.replaceDoc[T](replaceOptions)(asyncCollection).mapMaterializedValue(_.asJava).asJava
+
+ /**
+ * reference to [[CouchbaseFlow.replaceDoc]]
+ * <p>
+ * This function's return type changed in 2.0.0 to return a Sink with a
CompletionStage instead of a
+ * Scala Future, to be more consistent with Java usage.
+ * </p>
+ * @see {@link #replaceDocFuture} which works like this method worked in 1.x.
+ */
+ def replaceDoc[T](
+ asyncCollection: AsyncCollection): Sink[MutationDocument[T],
CompletionStage[Done]] =
+
ScalaCouchbaseSink.replaceDoc[T](ReplaceOptions.replaceOptions())(asyncCollection).mapMaterializedValue(
+ _.asJava).asJava
/**
* reference to [[CouchbaseFlow.replaceDoc]]
@@ -149,10 +215,23 @@ object CouchbaseSink {
* </p>
* @see {@link #replaceFuture} which works like this method worked in 1.x.
*/
- def replace[T](applyId: T => String,
- replaceOptions: ReplaceOptions = ReplaceOptions.replaceOptions())(
- implicit asyncCollection: AsyncCollection): Sink[T,
CompletionStage[Done]] =
- ScalaCouchbaseSink.replace[T](applyId,
replaceOptions).mapMaterializedValue(_.asJava).asJava
+ def replace[T](applyId: JFunction[T, String],
+ replaceOptions: ReplaceOptions,
+ asyncCollection: AsyncCollection): Sink[T, CompletionStage[Done]] =
+ ScalaCouchbaseSink.replace[T](applyId.apply,
replaceOptions)(asyncCollection).mapMaterializedValue(_.asJava).asJava
+
+ /**
+ * reference to [[CouchbaseFlow.replace]]
+ * <p>
+ * This function's return type changed in 2.0.0 to return a Sink with a
CompletionStage instead of a
+ * Scala Future, to be more consistent with Java usage.
+ * </p>
+ * @see {@link #replaceFuture} which works like this method worked in 1.x.
+ */
+ def replace[T](applyId: JFunction[T, String],
+ asyncCollection: AsyncCollection): Sink[T, CompletionStage[Done]] =
+ ScalaCouchbaseSink.replace[T](applyId.apply,
ReplaceOptions.replaceOptions())(asyncCollection).mapMaterializedValue(
+ _.asJava).asJava
/**
* reference to [[CouchbaseFlow.replace]]
@@ -172,10 +251,23 @@ object CouchbaseSink {
* </p>
* @see {@link #removeFuture} which works like this method worked in 1.x.
*/
- def remove[T](applyId: T => String,
- removeOptions: RemoveOptions = RemoveOptions.removeOptions())(
- implicit asyncCollection: AsyncCollection): Sink[T,
CompletionStage[Done]] =
- ScalaCouchbaseSink.remove[T](applyId,
removeOptions).mapMaterializedValue(_.asJava).asJava
+ def remove[T](applyId: JFunction[T, String],
+ removeOptions: RemoveOptions,
+ asyncCollection: AsyncCollection): Sink[T, CompletionStage[Done]] =
+ ScalaCouchbaseSink.remove[T](applyId.apply,
removeOptions)(asyncCollection).mapMaterializedValue(_.asJava).asJava
+
+ /**
+ * reference to [[CouchbaseFlow.remove]]
+ * <p>
+ * This function's return type changed in 2.0.0 to return a Sink with a
CompletionStage instead of a
+ * Scala Future, to be more consistent with Java usage.
+ * </p>
+ * @see {@link #removeFuture} which works like this method worked in 1.x.
+ */
+ def remove[T](applyId: JFunction[T, String],
+ asyncCollection: AsyncCollection): Sink[T, CompletionStage[Done]] =
+ ScalaCouchbaseSink.remove[T](applyId.apply,
RemoveOptions.removeOptions())(asyncCollection).mapMaterializedValue(
+ _.asJava).asJava
/**
* reference to [[CouchbaseFlow.remove]]
@@ -195,9 +287,23 @@ object CouchbaseSink {
* </p>
* @see {@link #existsFuture} which works like this method worked in 1.x.
*/
- def exists[T](applyId: T => String, existsOptions: ExistsOptions =
ExistsOptions.existsOptions())(
- implicit asyncCollection: AsyncCollection): Sink[T,
CompletionStage[java.lang.Boolean]] =
- ScalaCouchbaseSink.exists[T](applyId, existsOptions)
+ def exists[T](applyId: JFunction[T, String], existsOptions: ExistsOptions,
+ asyncCollection: AsyncCollection): Sink[T,
CompletionStage[java.lang.Boolean]] =
+ ScalaCouchbaseSink.exists[T](applyId.apply, existsOptions)(asyncCollection)
+
.mapMaterializedValue(_.map(Boolean.box)(scala.concurrent.ExecutionContext.parasitic).asJava)
+ .asJava
+
+ /**
+ * reference to [[CouchbaseFlow.exists]]
+ * <p>
+ * This function's return type changed in 2.0.0 to return a Sink with a
CompletionStage instead of a
+ * Scala Future, to be more consistent with Java usage.
+ * </p>
+ * @see {@link #existsFuture} which works like this method worked in 1.x.
+ */
+ def exists[T](applyId: JFunction[T, String],
+ asyncCollection: AsyncCollection): Sink[T,
CompletionStage[java.lang.Boolean]] =
+ ScalaCouchbaseSink.exists[T](applyId.apply,
ExistsOptions.existsOptions())(asyncCollection)
.mapMaterializedValue(_.map(Boolean.box)(scala.concurrent.ExecutionContext.parasitic).asJava)
.asJava
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]