This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-connectors.git


The following commit(s) were added to refs/heads/main by this push:
     new d689008be Fix Scala types leaking into javadsl public APIs (#1527)
d689008be is described below

commit d689008bea519ad1e85961c08a65d10d7724b34f
Author: PJ Fanning <[email protected]>
AuthorDate: Fri Mar 27 19:23:26 2026 +0100

    Fix Scala types leaking into javadsl public APIs (#1527)
    
    * Initial plan
    
    * Fix Scala types exposed in javadsl public APIs
    
    Co-authored-by: pjfanning <[email protected]>
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-connectors/sessions/776f4a51-985e-4753-a41a-9153bc3dd3e9
    
    * update
    
    * Fix ambiguous flowWithPassThrough calls in Java test files
    
    Co-authored-by: pjfanning <[email protected]>
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-connectors/sessions/fc286528-a630-4f9b-bee2-f22d587878d4
    
    ---------
    
    Co-authored-by: copilot-swe-agent[bot] 
<[email protected]>
    Co-authored-by: pjfanning <[email protected]>
---
 .../cassandra/javadsl/CassandraFlow.scala          |  39 +++++-
 .../stream/connectors/slick/javadsl/Slick.scala    | 131 +++++++++++++++++++++
 .../javadsl/DocSnippetFlowWithPassThrough.java     |   3 +-
 slick/src/test/java/docs/javadsl/SlickTest.java    |  11 +-
 4 files changed, 177 insertions(+), 7 deletions(-)

diff --git 
a/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/javadsl/CassandraFlow.scala
 
b/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/javadsl/CassandraFlow.scala
index 116aa19df..06be7daca 100644
--- 
a/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/javadsl/CassandraFlow.scala
+++ 
b/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/javadsl/CassandraFlow.scala
@@ -87,8 +87,11 @@ object CassandraFlow {
    * @param session implicit Cassandra session from `CassandraSessionRegistry`
    * @tparam T stream element type
    * @tparam K extracted key type for grouping into batches
+   * @deprecated Use [[createUnloggedBatch]] with 
`pekko.japi.function.Function2` for `statementBinder` instead (since 2.0.0).
    */
-  def createUnloggedBatch[T, K](session: CassandraSession,
+  @deprecated("Use createUnloggedBatch with pekko.japi.function.Function2 for 
statementBinder instead", "2.0.0")
+  @java.lang.Deprecated
+  def createUnloggedBatchWithScalaStatementBinder[T, K](session: 
CassandraSession,
       writeSettings: CassandraWriteSettings,
       cqlStatement: String,
       statementBinder: (T, PreparedStatement) => BoundStatement,
@@ -101,4 +104,38 @@ object CassandraFlow {
       .asJava
   }
 
+  /**
+   * Creates a flow that uses 
[[com.datastax.oss.driver.api.core.cql.BatchStatement]] and groups the
+   * elements internally into batches using the `writeSettings` and per 
`groupingKey`.
+   * Use this when most of the elements in the stream share the same partition 
key.
+   *
+   * Cassandra batches that share the same partition key will only
+   * resolve to one write internally in Cassandra, boosting write performance.
+   *
+   * "A LOGGED batch to a single partition will be converted to an UNLOGGED 
batch as an optimization."
+   * ([[https://cassandra.apache.org/doc/latest/cql/dml.html#batch Batch CQL]])
+   *
+   * Be aware that this stage does NOT preserve the upstream order.
+   *
+   * @param writeSettings settings to configure the batching and the write 
operation
+   * @param cqlStatement raw CQL statement
+   * @param statementBinder function to bind data from the stream element to 
the prepared statement
+   * @param groupingKey groups the elements to go into the same batch
+   * @param session implicit Cassandra session from `CassandraSessionRegistry`
+   * @tparam T stream element type
+   * @tparam K extracted key type for grouping into batches
+   */
+  def createUnloggedBatch[T, K](session: CassandraSession,
+      writeSettings: CassandraWriteSettings,
+      cqlStatement: String,
+      statementBinder: pekko.japi.function.Function2[T, PreparedStatement, 
BoundStatement],
+      groupingKey: pekko.japi.function.Function[T, K]): Flow[T, T, NotUsed] = {
+    scaladsl.CassandraFlow
+      .createBatch(writeSettings,
+        cqlStatement,
+        (t, preparedStatement) => statementBinder.apply(t, preparedStatement),
+        t => groupingKey.apply(t))(session.delegate)
+      .asJava
+  }
+
 }
diff --git 
a/slick/src/main/scala/org/apache/pekko/stream/connectors/slick/javadsl/Slick.scala
 
b/slick/src/main/scala/org/apache/pekko/stream/connectors/slick/javadsl/Slick.scala
index f4651941b..f1fb8104c 100644
--- 
a/slick/src/main/scala/org/apache/pekko/stream/connectors/slick/javadsl/Slick.scala
+++ 
b/slick/src/main/scala/org/apache/pekko/stream/connectors/slick/javadsl/Slick.scala
@@ -16,6 +16,7 @@ package org.apache.pekko.stream.connectors.slick.javadsl
 import java.sql.Connection
 import java.sql.PreparedStatement
 import java.util.concurrent.CompletionStage
+import java.util.concurrent.Executor
 import java.util.function.{ Function => JFunction }
 import java.util.function.{ BiFunction => JBiFunction }
 
@@ -153,7 +154,10 @@ object Slick {
    *                    DDL statement is acceptable.
    * @param mapper A function to create a result from the incoming element T
    *               and the database statement result.
+   * @deprecated Use the overload with `java.util.concurrent.Executor` instead.
    */
+  @deprecated("Use flowWithPassThrough with java.util.concurrent.Executor 
instead", "2.0.0")
+  @java.lang.Deprecated
   def flowWithPassThrough[T, R](
       session: SlickSession,
       executionContext: ExecutionContext,
@@ -161,6 +165,29 @@ object Slick {
       mapper: JBiFunction[T, java.lang.Integer, R]): Flow[T, R, NotUsed] =
     flowWithPassThrough(session, executionContext, 1, toStatement, mapper)
 
+  /**
+   * Java API: creates a Flow that takes a stream of elements of
+   *           type T, transforms each element to a SQL statement
+   *           using the specified function, then executes
+   *           those statements against the specified Slick database
+   *           and allows to combine the statement result and element into a 
result type R.
+   *
+   * @param session The database session to use.
+   * @param executor Executor used to run mapper function in.
+   *                 E.g. the dispatcher of the ActorSystem.
+   * @param toStatement A function that creates the SQL statement to
+   *                    execute from the current element. Any DML or
+   *                    DDL statement is acceptable.
+   * @param mapper A function to create a result from the incoming element T
+   *               and the database statement result.
+   */
+  def flowWithPassThrough[T, R](
+      session: SlickSession,
+      executor: Executor,
+      toStatement: JFunction[T, String],
+      mapper: JBiFunction[T, java.lang.Integer, R]): Flow[T, R, NotUsed] =
+    flowWithPassThrough(session, executor, 1, toStatement, mapper)
+
   /**
    * Java API: creates a Flow that takes a stream of elements of
    *           type T, transforms each element to a SQL statement
@@ -176,7 +203,10 @@ object Slick {
    *                    DDL statement is acceptable.
    * @param mapper A function to create a result from the incoming element T
    *               and the database statement result.
+   * @deprecated Use the overload with `java.util.concurrent.Executor` instead 
(since 2.0.0).
    */
+  @deprecated("Use flowWithPassThrough with java.util.concurrent.Executor 
instead", "2.0.0")
+  @java.lang.Deprecated
   def flowWithPassThrough[T, R](
       session: SlickSession,
       executionContext: ExecutionContext,
@@ -184,6 +214,29 @@ object Slick {
       mapper: Function2[T, java.lang.Integer, R]): Flow[T, R, NotUsed] =
     flowWithPassThrough(session, executionContext, 1, toStatement, mapper)
 
+  /**
+   * Java API: creates a Flow that takes a stream of elements of
+   *           type T, transforms each element to a SQL statement
+   *           using the specified function, then executes
+   *           those statements against the specified Slick database
+   *           and allows to combine the statement result and element into a 
result type R.
+   *
+   * @param session The database session to use.
+   * @param executor Executor used to run mapper function in.
+   *                 E.g. the dispatcher of the ActorSystem.
+   * @param toStatement A function that creates the SQL statement to
+   *                    execute from the current element. Any DML or
+   *                    DDL statement is acceptable.
+   * @param mapper A function to create a result from the incoming element T
+   *               and the database statement result.
+   */
+  def flowWithPassThrough[T, R](
+      session: SlickSession,
+      executor: Executor,
+      toStatement: Function2[T, Connection, PreparedStatement],
+      mapper: Function2[T, java.lang.Integer, R]): Flow[T, R, NotUsed] =
+    flowWithPassThrough(session, executor, 1, toStatement, mapper)
+
   /**
    * Java API: creates a Flow that takes a stream of elements of
    *           type T, transforms each element to a SQL statement
@@ -202,7 +255,10 @@ object Slick {
    *                    DDL statement is acceptable.
    * @param mapper A function to create a result from the incoming element T
    *               and the database statement result.
+   * @deprecated Use the overload with `java.util.concurrent.Executor` instead.
    */
+  @deprecated("Use flowWithPassThrough with java.util.concurrent.Executor 
instead", "2.0.0")
+  @java.lang.Deprecated
   def flowWithPassThrough[T, R](
       session: SlickSession,
       executionContext: ExecutionContext,
@@ -218,6 +274,42 @@ object Slick {
         })(session)
       .asJava
 
+  /**
+   * Java API: creates a Flow that takes a stream of elements of
+   *           type T, transforms each element to a SQL statement
+   *           using the specified function, then executes
+   *           those statements against the specified Slick database
+   *           and allows to combine the statement result and element into a 
result type R.
+   *
+   * @param session The database session to use.
+   * @param executor Executor used to run mapper function in.
+   *                 E.g. the dispatcher of the ActorSystem.
+   * @param parallelism How many parallel asynchronous streams should be
+   *                    used to send statements to the database. Use a
+   *                    value of 1 for sequential execution.
+   * @param toStatement A function that creates the SQL statement to
+   *                    execute from the current element. Any DML or
+   *                    DDL statement is acceptable.
+   * @param mapper A function to create a result from the incoming element T
+   *               and the database statement result.
+   */
+  def flowWithPassThrough[T, R](
+      session: SlickSession,
+      executor: Executor,
+      parallelism: Int,
+      toStatement: JFunction[T, String],
+      mapper: JBiFunction[T, java.lang.Integer, R]): Flow[T, R, NotUsed] = {
+    val executionContext = ExecutionContext.fromExecutor(executor)
+    ScalaSlick
+      .flowWithPassThrough[T, R](parallelism,
+        (t: T) => {
+          toDBIO(toStatement)
+            .apply(t)
+            .map(count => mapper.apply(t, count))(executionContext)
+        })(session)
+      .asJava
+  }
+
   /**
    * Java API: creates a Flow that takes a stream of elements of
    *           type T, transforms each element to a SQL statement
@@ -236,7 +328,10 @@ object Slick {
    *                    DDL statement is acceptable.
    * @param mapper A function to create a result from the incoming element T
    *               and the database statement result.
+   * @deprecated Use the overload with `java.util.concurrent.Executor` instead.
    */
+  @deprecated("Use flowWithPassThrough with java.util.concurrent.Executor 
instead", "2.0.0")
+  @java.lang.Deprecated
   def flowWithPassThrough[T, R](
       session: SlickSession,
       executionContext: ExecutionContext,
@@ -252,6 +347,42 @@ object Slick {
         })(session)
       .asJava
 
+  /**
+   * Java API: creates a Flow that takes a stream of elements of
+   *           type T, transforms each element to a SQL statement
+   *           using the specified function, then executes
+   *           those statements against the specified Slick database
+   *           and allows to combine the statement result and element into a 
result type R.
+   *
+   * @param session The database session to use.
+   * @param executor Executor used to run mapper function in.
+   *                 E.g. the dispatcher of the ActorSystem.
+   * @param parallelism How many parallel asynchronous streams should be
+   *                    used to send statements to the database. Use a
+   *                    value of 1 for sequential execution.
+   * @param toStatement A function that creates the SQL statement to
+   *                    execute from the current element. Any DML or
+   *                    DDL statement is acceptable.
+   * @param mapper A function to create a result from the incoming element T
+   *               and the database statement result.
+   */
+  def flowWithPassThrough[T, R](
+      session: SlickSession,
+      executor: Executor,
+      parallelism: Int,
+      toStatement: Function2[T, Connection, PreparedStatement],
+      mapper: Function2[T, java.lang.Integer, R]): Flow[T, R, NotUsed] = {
+    val executionContext = ExecutionContext.fromExecutor(executor)
+    ScalaSlick
+      .flowWithPassThrough[T, R](parallelism,
+        (t: T) => {
+          toDBIO(toStatement)
+            .apply(t)
+            .map(count => mapper.apply(t, count))(executionContext)
+        })(session)
+      .asJava
+  }
+
   /**
    * Java API: creates a Sink that takes a stream of elements of
    *           type T, transforms each element to a SQL statement
diff --git 
a/slick/src/test/java/docs/javadsl/DocSnippetFlowWithPassThrough.java 
b/slick/src/test/java/docs/javadsl/DocSnippetFlowWithPassThrough.java
index 131d50ee6..8a24c6259 100644
--- a/slick/src/test/java/docs/javadsl/DocSnippetFlowWithPassThrough.java
+++ b/slick/src/test/java/docs/javadsl/DocSnippetFlowWithPassThrough.java
@@ -25,6 +25,7 @@ import java.sql.PreparedStatement;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
+import java.util.concurrent.Executor;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -84,7 +85,7 @@ public class DocSnippetFlowWithPassThrough {
             .via(
                 Slick.flowWithPassThrough(
                     session,
-                    system.dispatcher(),
+                    (Executor) system.dispatcher(),
                     // add an optional second argument to specify the 
parallelism factor (int)
                     (kafkaMessage, connection) -> {
                       PreparedStatement statement =
diff --git a/slick/src/test/java/docs/javadsl/SlickTest.java 
b/slick/src/test/java/docs/javadsl/SlickTest.java
index e11e34d05..ad40ce501 100644
--- a/slick/src/test/java/docs/javadsl/SlickTest.java
+++ b/slick/src/test/java/docs/javadsl/SlickTest.java
@@ -41,6 +41,7 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -243,7 +244,7 @@ public class SlickTest {
   @Test
   public void testFlowWithPassThroughWithoutParallelismAndReadBackWithSource() 
throws Exception {
     final Flow<User, User, NotUsed> slickFlow =
-        Slick.flowWithPassThrough(session, system.dispatcher(), insertUser, 
(user, i) -> user);
+        Slick.flowWithPassThrough(session, (Executor) system.dispatcher(), 
insertUser, (user, i) -> user);
     final List<User> insertedUsers =
         usersSource
             .via(slickFlow)
@@ -258,7 +259,7 @@ public class SlickTest {
   @Test
   public void 
testFlowPSWithPassThroughWithoutParallelismAndReadBackWithSource() throws 
Exception {
     final Flow<User, User, NotUsed> slickFlow =
-        Slick.flowWithPassThrough(session, system.dispatcher(), insertUserPS, 
(user, i) -> user);
+        Slick.flowWithPassThrough(session, (Executor) system.dispatcher(), 
insertUserPS, (user, i) -> user);
     final List<User> insertedUsers =
         usersSource
             .via(slickFlow)
@@ -273,7 +274,7 @@ public class SlickTest {
   @Test
   public void testFlowWithPassThroughWithParallelismOf4AndReadBackWithSource() 
throws Exception {
     final Flow<User, User, NotUsed> slickFlow =
-        Slick.flowWithPassThrough(session, system.dispatcher(), 4, insertUser, 
(user, i) -> user);
+        Slick.flowWithPassThrough(session, (Executor) system.dispatcher(), 4, 
insertUser, (user, i) -> user);
     final List<User> insertedUsers =
         usersSource
             .via(slickFlow)
@@ -288,7 +289,7 @@ public class SlickTest {
   @Test
   public void 
testFlowPSWithPassThroughWithParallelismOf4AndReadBackWithSource() throws 
Exception {
     final Flow<User, User, NotUsed> slickFlow =
-        Slick.flowWithPassThrough(session, system.dispatcher(), 4, 
insertUserPS, (user, i) -> user);
+        Slick.flowWithPassThrough(session, (Executor) system.dispatcher(), 4, 
insertUserPS, (user, i) -> user);
     final List<User> insertedUsers =
         usersSource
             .via(slickFlow)
@@ -325,7 +326,7 @@ public class SlickTest {
             .via(
                 Slick.flowWithPassThrough(
                     session,
-                    system.dispatcher(),
+                    (Executor) system.dispatcher(),
                     insertUserInKafkaMessage,
                     (kafkaMessage, insertCount) -> kafkaMessage.map(user -> 
insertCount)))
             .mapAsync(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to