This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-connectors.git
The following commit(s) were added to refs/heads/main by this push:
new 99312dcf9 Replace deprecated OrientDB APIs in orientdb connector
(#1553)
99312dcf9 is described below
commit 99312dcf9ee9b132e7ed10af1858263d093ec18c
Author: PJ Fanning <[email protected]>
AuthorDate: Tue Apr 7 09:39:12 2026 +0200
Replace deprecated OrientDB APIs in orientdb connector (#1553)
* Replace deprecated OrientDB APIs: OPartitionedDatabasePool,
ODatabaseDocumentTx, OSQLSynchQuery
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-connectors/sessions/299c1de3-c73d-4239-bdd3-d7dbd1cd5184
Co-authored-by: pjfanning <[email protected]>
* Fix remaining deprecation warnings: replace OServerAdmin with OrientDB,
replace ODocument.save() with client.save()
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-connectors/sessions/dec73062-3e40-4181-a813-5aadacbc726c
Co-authored-by: pjfanning <[email protected]>
* Add @OVersion fields to typed POJO test classes to fix MVCC
OTransactionException
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-connectors/sessions/a525e280-9baf-4079-84d1-fbd47a4912f9
Co-authored-by: pjfanning <[email protected]>
* Create switch-away-from-deprecated-classes.backwards.excludes
* init the arraylist size
---------
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
Co-authored-by: pjfanning <[email protected]>
---
build.sbt | 4 +-
...away-from-deprecated-classes.backwards.excludes | 26 ++++++++
.../orientdb/OrientDbSourceSettings.scala | 12 ++--
.../orientdb/OrientDbWriteSettings.scala | 14 ++---
.../orientdb/impl/OrientDbFlowStage.scala | 8 +--
.../orientdb/impl/OrientDbSourceStage.scala | 69 ++++++++++++++++------
.../src/test/java/docs/javadsl/OrientDbTest.java | 55 +++++++++--------
.../test/scala/docs/scaladsl/OrientDbSpec.scala | 43 +++++++-------
8 files changed, 144 insertions(+), 87 deletions(-)
diff --git a/build.sbt b/build.sbt
index 58ef5ac7c..ef0b7de37 100644
--- a/build.sbt
+++ b/build.sbt
@@ -340,9 +340,7 @@ lazy val orientdb =
"orientdb",
"orientdb",
Dependencies.OrientDB,
- Test / fork := true,
- // note: orientdb client needs to be refactored to move off deprecated
calls
- fatalWarnings := false)
+ Test / fork := true)
lazy val reference = internalProject("reference", Dependencies.Reference)
.dependsOn(testkit % Test)
diff --git
a/orientdb/src/main/mima-filters/2.0.x.backward.excludes/switch-away-from-deprecated-classes.backwards.excludes
b/orientdb/src/main/mima-filters/2.0.x.backward.excludes/switch-away-from-deprecated-classes.backwards.excludes
new file mode 100644
index 000000000..21194c58b
--- /dev/null
+++
b/orientdb/src/main/mima-filters/2.0.x.backward.excludes/switch-away-from-deprecated-classes.backwards.excludes
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# switch away from deprecated orientdb classes
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.connectors.orientdb.OrientDbSourceSettings.create")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.connectors.orientdb.OrientDbSourceSettings.apply")
+ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.stream.connectors.orientdb.OrientDbSourceSettings.oDatabasePool")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.connectors.orientdb.OrientDbSourceSettings.withOrientDBCredentials")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.connectors.orientdb.OrientDbWriteSettings.create")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.connectors.orientdb.OrientDbWriteSettings.apply")
+ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.stream.connectors.orientdb.OrientDbWriteSettings.oDatabasePool")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.connectors.orientdb.OrientDbWriteSettings.withOrientDBCredentials")
diff --git
a/orientdb/src/main/scala/org/apache/pekko/stream/connectors/orientdb/OrientDbSourceSettings.scala
b/orientdb/src/main/scala/org/apache/pekko/stream/connectors/orientdb/OrientDbSourceSettings.scala
index 4b0ef46d6..a2afda5bb 100644
---
a/orientdb/src/main/scala/org/apache/pekko/stream/connectors/orientdb/OrientDbSourceSettings.scala
+++
b/orientdb/src/main/scala/org/apache/pekko/stream/connectors/orientdb/OrientDbSourceSettings.scala
@@ -13,21 +13,21 @@
package org.apache.pekko.stream.connectors.orientdb
-import com.orientechnologies.orient.core.db.OPartitionedDatabasePool
+import com.orientechnologies.orient.core.db.ODatabasePool
final class OrientDbSourceSettings private (
- val oDatabasePool:
com.orientechnologies.orient.core.db.OPartitionedDatabasePool,
+ val oDatabasePool: com.orientechnologies.orient.core.db.ODatabasePool,
val skip: Int,
val limit: Int) {
def withOrientDBCredentials(
- value: com.orientechnologies.orient.core.db.OPartitionedDatabasePool):
OrientDbSourceSettings =
+ value: com.orientechnologies.orient.core.db.ODatabasePool):
OrientDbSourceSettings =
copy(oDatabasePool = value)
def withSkip(value: Int): OrientDbSourceSettings = copy(skip = value)
def withLimit(value: Int): OrientDbSourceSettings = copy(limit = value)
private def copy(
- oDatabasePool:
com.orientechnologies.orient.core.db.OPartitionedDatabasePool = oDatabasePool,
+ oDatabasePool: com.orientechnologies.orient.core.db.ODatabasePool =
oDatabasePool,
skip: Int = skip,
limit: Int = limit): OrientDbSourceSettings = new OrientDbSourceSettings(
oDatabasePool = oDatabasePool,
@@ -45,11 +45,11 @@ final class OrientDbSourceSettings private (
object OrientDbSourceSettings {
/** Scala API */
- def apply(oDatabasePool: OPartitionedDatabasePool): OrientDbSourceSettings =
new OrientDbSourceSettings(
+ def apply(oDatabasePool: ODatabasePool): OrientDbSourceSettings = new
OrientDbSourceSettings(
oDatabasePool,
skip = 0,
limit = 10)
/** Java API */
- def create(oDatabasePool: OPartitionedDatabasePool): OrientDbSourceSettings
= apply(oDatabasePool)
+ def create(oDatabasePool: ODatabasePool): OrientDbSourceSettings =
apply(oDatabasePool)
}
diff --git
a/orientdb/src/main/scala/org/apache/pekko/stream/connectors/orientdb/OrientDbWriteSettings.scala
b/orientdb/src/main/scala/org/apache/pekko/stream/connectors/orientdb/OrientDbWriteSettings.scala
index 49a9c70d6..3e27231e1 100644
---
a/orientdb/src/main/scala/org/apache/pekko/stream/connectors/orientdb/OrientDbWriteSettings.scala
+++
b/orientdb/src/main/scala/org/apache/pekko/stream/connectors/orientdb/OrientDbWriteSettings.scala
@@ -13,17 +13,17 @@
package org.apache.pekko.stream.connectors.orientdb
-import com.orientechnologies.orient.core.db.OPartitionedDatabasePool
+import com.orientechnologies.orient.core.db.ODatabasePool
final class OrientDbWriteSettings private (
- val oDatabasePool:
com.orientechnologies.orient.core.db.OPartitionedDatabasePool) {
+ val oDatabasePool: com.orientechnologies.orient.core.db.ODatabasePool) {
def withOrientDBCredentials(
- value: com.orientechnologies.orient.core.db.OPartitionedDatabasePool):
OrientDbWriteSettings =
+ value: com.orientechnologies.orient.core.db.ODatabasePool):
OrientDbWriteSettings =
copy(oDatabasePool = value)
private def copy(
- oDatabasePool:
com.orientechnologies.orient.core.db.OPartitionedDatabasePool):
OrientDbWriteSettings =
+ oDatabasePool: com.orientechnologies.orient.core.db.ODatabasePool):
OrientDbWriteSettings =
new OrientDbWriteSettings(
oDatabasePool = oDatabasePool)
@@ -36,10 +36,10 @@ final class OrientDbWriteSettings private (
object OrientDbWriteSettings {
/** Scala API */
- def apply(oDatabasePool: OPartitionedDatabasePool): OrientDbWriteSettings =
+ def apply(oDatabasePool: ODatabasePool): OrientDbWriteSettings =
new OrientDbWriteSettings(
- oDatabasePool: OPartitionedDatabasePool)
+ oDatabasePool: ODatabasePool)
/** Java API */
- def create(oDatabasePool: OPartitionedDatabasePool): OrientDbWriteSettings =
apply(oDatabasePool)
+ def create(oDatabasePool: ODatabasePool): OrientDbWriteSettings =
apply(oDatabasePool)
}
diff --git
a/orientdb/src/main/scala/org/apache/pekko/stream/connectors/orientdb/impl/OrientDbFlowStage.scala
b/orientdb/src/main/scala/org/apache/pekko/stream/connectors/orientdb/impl/OrientDbFlowStage.scala
index 11a2ad82d..6230f42f4 100644
---
a/orientdb/src/main/scala/org/apache/pekko/stream/connectors/orientdb/impl/OrientDbFlowStage.scala
+++
b/orientdb/src/main/scala/org/apache/pekko/stream/connectors/orientdb/impl/OrientDbFlowStage.scala
@@ -19,7 +19,8 @@ import pekko.stream._
import pekko.stream.connectors.orientdb.{ OrientDbWriteMessage,
OrientDbWriteSettings }
import pekko.stream.stage._
import com.orientechnologies.orient.`object`.db.OObjectDatabaseTx
-import com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx
+import com.orientechnologies.orient.core.db.ODatabaseDocumentInternal
+import com.orientechnologies.orient.core.db.ODatabaseSession
import com.orientechnologies.orient.core.record.ORecord
import com.orientechnologies.orient.core.record.impl.ODocument
import com.orientechnologies.orient.core.tx.OTransaction
@@ -54,13 +55,12 @@ private[orientdb] class OrientDbFlowStage[T, C](
sealed abstract class OrientDbLogic extends GraphStageLogic(shape) with
InHandler with OutHandler {
- protected var client: ODatabaseDocumentTx = _
+ protected var client: ODatabaseSession = _
protected var oObjectClient: OObjectDatabaseTx = _
override def preStart(): Unit = {
client = settings.oDatabasePool.acquire()
- oObjectClient = new OObjectDatabaseTx(client)
- client.setDatabaseOwner(oObjectClient)
+ oObjectClient = new
OObjectDatabaseTx(client.asInstanceOf[ODatabaseDocumentInternal])
}
override def postStop(): Unit = {
diff --git
a/orientdb/src/main/scala/org/apache/pekko/stream/connectors/orientdb/impl/OrientDbSourceStage.scala
b/orientdb/src/main/scala/org/apache/pekko/stream/connectors/orientdb/impl/OrientDbSourceStage.scala
index 4c83e694b..0b91a2655 100644
---
a/orientdb/src/main/scala/org/apache/pekko/stream/connectors/orientdb/impl/OrientDbSourceStage.scala
+++
b/orientdb/src/main/scala/org/apache/pekko/stream/connectors/orientdb/impl/OrientDbSourceStage.scala
@@ -21,10 +21,11 @@ import pekko.stream.connectors.orientdb.{
OrientDbReadResult, OrientDbSourceSett
import pekko.stream.stage.{ GraphStage, GraphStageLogic, OutHandler }
import pekko.stream.{ ActorAttributes, Attributes, Outlet, SourceShape }
import com.orientechnologies.orient.`object`.db.OObjectDatabaseTx
-import com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx
-import com.orientechnologies.orient.core.sql.query.OSQLSynchQuery
+import com.orientechnologies.orient.core.db.ODatabaseDocumentInternal
+import com.orientechnologies.orient.core.db.ODatabaseSession
import scala.jdk.CollectionConverters._
+import scala.jdk.OptionConverters._
/**
* INTERNAL API
@@ -48,15 +49,25 @@ private[orientdb] final class
OrientDbSourceStage[T](className: String,
query match {
case Some(q) =>
new Logic {
- override protected def runQuery(): util.List[T] =
- client.query[util.List[T]](new OSQLSynchQuery[T](q))
-
+ override protected def runQuery(): util.List[T] = {
+ val rs = client.query(q)
+ val results = newArrayListWithSize(rs.estimateSize())
+ try {
+ while (rs.hasNext)
results.add(rs.next().toElement.asInstanceOf[T])
+ } finally rs.close()
+ results
+ }
}
case None =>
new Logic {
- override protected def runQuery(): util.List[T] =
- client.query[util.List[T]](
- new OSQLSynchQuery[T](s"SELECT * FROM $className SKIP
${skip} LIMIT ${settings.limit}"))
+ override protected def runQuery(): util.List[T] = {
+ val rs = client.query(s"SELECT * FROM $className SKIP ${skip}
LIMIT ${settings.limit}")
+ val results = newArrayListWithSize(rs.estimateSize())
+ try {
+ while (rs.hasNext)
results.add(rs.next().toElement.asInstanceOf[T])
+ } finally rs.close()
+ results
+ }
}
}
@@ -70,9 +81,16 @@ private[orientdb] final class
OrientDbSourceStage[T](className: String,
}
override protected def runQuery(): util.List[T] = {
- client.setDatabaseOwner(oObjectClient)
- oObjectClient.getEntityManager.registerEntityClass(c)
- oObjectClient.query[util.List[T]](new OSQLSynchQuery[T](q))
+ val rs = oObjectClient.query(q)
+ val results = newArrayListWithSize(rs.estimateSize())
+ try {
+ while (rs.hasNext) {
+ rs.next().getRecord().toScala.foreach { record =>
+ results.add(oObjectClient.getUserObjectByRecord(record,
null).asInstanceOf[T])
+ }
+ }
+ } finally rs.close()
+ results
}
}
case None =>
@@ -82,26 +100,39 @@ private[orientdb] final class
OrientDbSourceStage[T](className: String,
oObjectClient.getEntityManager.registerEntityClass(c)
}
- override protected def runQuery(): util.List[T] =
- oObjectClient
- .query[util.List[T]](
- new OSQLSynchQuery[T](
- s"SELECT * FROM $className SKIP ${skip} LIMIT
${settings.limit}"))
+ override protected def runQuery(): util.List[T] = {
+ val rs =
+ oObjectClient.query(s"SELECT * FROM $className SKIP ${skip}
LIMIT ${settings.limit}")
+ val results = newArrayListWithSize(rs.estimateSize())
+ try {
+ while (rs.hasNext) {
+ rs.next().getRecord().toScala.foreach { record =>
+ results.add(oObjectClient.getUserObjectByRecord(record,
null).asInstanceOf[T])
+ }
+ }
+ } finally rs.close()
+ results
+ }
}
}
}
+ // if the size is larger than Int.MaxValue, we will just create an ArrayList
with a default
+ // size of 1000 and let it grow as needed - we assume that estimateSize() is
just a hint
+ private def newArrayListWithSize(size: Long): util.ArrayList[T] =
+ if (size > Int.MaxValue) new util.ArrayList[T](1000)
+ else new util.ArrayList[T](math.max(size.toInt, 0))
+
private abstract class Logic extends GraphStageLogic(shape) with OutHandler {
- protected var client: ODatabaseDocumentTx = _
+ protected var client: ODatabaseSession = _
protected var oObjectClient: OObjectDatabaseTx = _
protected var skip = settings.skip
override def preStart(): Unit = {
client = settings.oDatabasePool.acquire()
- oObjectClient = new OObjectDatabaseTx(client)
- client.setDatabaseOwner(oObjectClient)
+ oObjectClient = new
OObjectDatabaseTx(client.asInstanceOf[ODatabaseDocumentInternal])
}
override def postStop(): Unit =
diff --git a/orientdb/src/test/java/docs/javadsl/OrientDbTest.java
b/orientdb/src/test/java/docs/javadsl/OrientDbTest.java
index d602d85db..ad516acb5 100644
--- a/orientdb/src/test/java/docs/javadsl/OrientDbTest.java
+++ b/orientdb/src/test/java/docs/javadsl/OrientDbTest.java
@@ -26,12 +26,16 @@ import
org.apache.pekko.stream.connectors.testkit.javadsl.LogCapturingJunit4;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;
import org.apache.pekko.testkit.javadsl.TestKit;
-import com.orientechnologies.orient.client.remote.OServerAdmin;
+import com.orientechnologies.orient.core.annotation.OVersion;
+import com.orientechnologies.orient.core.db.ODatabaseDocumentInternal;
import com.orientechnologies.orient.core.db.ODatabaseRecordThreadLocal;
+import com.orientechnologies.orient.core.db.ODatabaseType;
+import com.orientechnologies.orient.core.db.OrientDB;
+import com.orientechnologies.orient.core.db.OrientDBConfig;
// #init-settings
-import com.orientechnologies.orient.core.db.OPartitionedDatabasePool;
+import com.orientechnologies.orient.core.db.ODatabasePool;
// #init-settings
-import com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx;
+import com.orientechnologies.orient.core.db.ODatabaseSession;
import com.orientechnologies.orient.core.record.impl.ODocument;
import com.orientechnologies.orient.object.db.OObjectDatabaseTx;
import org.junit.AfterClass;
@@ -54,16 +58,15 @@ import static org.junit.Assert.assertEquals;
public class OrientDbTest {
@Rule public final LogCapturingJunit4 logCapturing = new
LogCapturingJunit4();
- private static OServerAdmin oServerAdmin;
- private static OPartitionedDatabasePool oDatabase;
- private static ODatabaseDocumentTx client;
+ private static OrientDB orientDB;
+ private static ODatabasePool oDatabase;
+ private static ODatabaseSession client;
private static ActorSystem system;
// #init-settings
private static String url = "remote:127.0.0.1:2424/";
private static String dbName = "GratefulDeadConcertsJava";
- private static String dbUrl = url + dbName;
private static String username = "root";
private static String password = "root";
// #init-settings
@@ -78,6 +81,7 @@ public class OrientDbTest {
public static class source1 {
private String book_title;
+ @OVersion private Integer version;
public void setBook_title(String book_title) {
this.book_title = book_title;
@@ -91,6 +95,7 @@ public class OrientDbTest {
public static class sink2 {
private String book_title;
+ @OVersion private Integer version;
public void setBook_title(String book_title) {
this.book_title = book_title;
@@ -151,16 +156,12 @@ public class OrientDbTest {
public static void setup() throws Exception {
system = ActorSystem.create();
- oServerAdmin = new OServerAdmin(url).connect(username, password);
- if (!oServerAdmin.existsDatabase(dbName, "plocal")) {
- oServerAdmin.createDatabase(dbName, "document", "plocal");
- }
+ orientDB = new OrientDB(url, username, password,
OrientDBConfig.defaultConfig());
+ orientDB.createIfNotExists(dbName, ODatabaseType.PLOCAL);
// #init-settings
- oDatabase =
- new OPartitionedDatabasePool(
- dbUrl, username, password,
Runtime.getRuntime().availableProcessors(), 10);
+ oDatabase = new ODatabasePool(orientDB, dbName, username, password);
system.registerOnTermination(() -> oDatabase.close());
// #init-settings
@@ -185,13 +186,12 @@ public class OrientDbTest {
unregister(sink3);
unregister(sink6);
- if (oServerAdmin.existsDatabase(dbName, "plocal")) {
- oServerAdmin.dropDatabase(dbName, "plocal");
- }
- oServerAdmin.close();
-
client.close();
oDatabase.close();
+ if (orientDB.exists(dbName)) {
+ orientDB.drop(dbName);
+ }
+ orientDB.close();
TestKit.shutdownActorSystem(system);
}
@@ -204,7 +204,7 @@ public class OrientDbTest {
private static void flush(String className, String fieldName, String
fieldValue) {
ODocument oDocument = new ODocument().field(fieldName, fieldValue);
oDocument.setClassNameIfExists(className);
- oDocument.save();
+ client.save(oDocument);
}
private static void unregister(String className) {
@@ -274,8 +274,9 @@ public class OrientDbTest {
sourceClass, OrientDbSourceSettings.create(oDatabase),
source1.class, null)
.map(
readResult -> {
- ODatabaseDocumentTx db = oDatabase.acquire();
- db.setDatabaseOwner(new OObjectDatabaseTx(db));
+ ODatabaseDocumentInternal db =
+ (ODatabaseDocumentInternal) oDatabase.acquire();
+ new OObjectDatabaseTx(db);
ODatabaseRecordThreadLocal.instance().set(db);
sink2 sink = new sink2();
sink.setBook_title(readResult.oDocument().getBook_title());
@@ -295,8 +296,9 @@ public class OrientDbTest {
sinkClass2, OrientDbSourceSettings.create(oDatabase),
sink2.class, null)
.map(
m -> {
- ODatabaseDocumentTx db = oDatabase.acquire();
- db.setDatabaseOwner(new OObjectDatabaseTx(db));
+ ODatabaseDocumentInternal db =
+ (ODatabaseDocumentInternal) oDatabase.acquire();
+ new OObjectDatabaseTx(db);
ODatabaseRecordThreadLocal.instance().set(db);
return m.oDocument().getBook_title();
})
@@ -351,8 +353,9 @@ public class OrientDbTest {
.via(OrientDbFlow.createWithPassThrough(sink6,
OrientDbWriteSettings.create(oDatabase)))
.map(
messages -> {
- ODatabaseDocumentTx db = oDatabase.acquire();
- db.setDatabaseOwner(new OObjectDatabaseTx(db));
+ ODatabaseDocumentInternal db =
+ (ODatabaseDocumentInternal) oDatabase.acquire();
+ new OObjectDatabaseTx(db);
ODatabaseRecordThreadLocal.instance().set(db);
messages.stream().forEach(message ->
commitToKafka.accept(message.passThrough()));
return NotUsed.getInstance();
diff --git a/orientdb/src/test/scala/docs/scaladsl/OrientDbSpec.scala
b/orientdb/src/test/scala/docs/scaladsl/OrientDbSpec.scala
index f4dff41f9..c6953f620 100644
--- a/orientdb/src/test/scala/docs/scaladsl/OrientDbSpec.scala
+++ b/orientdb/src/test/scala/docs/scaladsl/OrientDbSpec.scala
@@ -28,10 +28,13 @@ import pekko.stream.scaladsl.{ Sink, Source }
import pekko.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped
import pekko.testkit.TestKit
import com.orientechnologies.orient.`object`.db.OObjectDatabaseTx
-import com.orientechnologies.orient.client.remote.OServerAdmin
-import com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx
+import com.orientechnologies.orient.core.db.ODatabaseDocumentInternal
+import com.orientechnologies.orient.core.db.ODatabaseSession
+import com.orientechnologies.orient.core.db.ODatabaseType
+import com.orientechnologies.orient.core.db.OrientDB
+import com.orientechnologies.orient.core.db.OrientDBConfig
//#init-settings
-import com.orientechnologies.orient.core.db.OPartitionedDatabasePool
+import com.orientechnologies.orient.core.db.ODatabasePool
//#init-settings
import com.orientechnologies.orient.core.db.ODatabaseRecordThreadLocal
import com.orientechnologies.orient.core.record.impl.ODocument
@@ -55,7 +58,6 @@ class OrientDbSpec extends AnyWordSpec with Matchers with
BeforeAndAfterAll with
val url = "remote:127.0.0.1:2424/"
val dbName = "GratefulDeadConcertsScala"
- val dbUrl = s"$url$dbName"
val username = "root"
val password = "root"
// #init-settings
@@ -70,20 +72,18 @@ class OrientDbSpec extends AnyWordSpec with Matchers with
BeforeAndAfterAll with
case class Book(title: String)
// #define-class
- var oServerAdmin: OServerAdmin = _
- var oDatabase: OPartitionedDatabasePool = _
- var client: ODatabaseDocumentTx = _
+ var orientDB: OrientDB = _
+ var oDatabase: ODatabasePool = _
+ var client: ODatabaseSession = _
override def beforeAll() = {
- oServerAdmin = new OServerAdmin(url).connect(username, password)
- if (!oServerAdmin.existsDatabase(dbName, "plocal")) {
- oServerAdmin.createDatabase(dbName, "document", "plocal")
- }
+ orientDB = new OrientDB(url, username, password,
OrientDBConfig.defaultConfig())
+ orientDB.createIfNotExists(dbName, ODatabaseType.PLOCAL)
// #init-settings
- val oDatabase: OPartitionedDatabasePool =
- new OPartitionedDatabasePool(dbUrl, username, password,
Runtime.getRuntime.availableProcessors(), 10)
+ val oDatabase: ODatabasePool =
+ new ODatabasePool(orientDB, dbName, username, password)
system.registerOnTermination(() -> oDatabase.close())
// #init-settings
@@ -107,13 +107,12 @@ class OrientDbSpec extends AnyWordSpec with Matchers with
BeforeAndAfterAll with
unregister(sink5)
unregister(sink7)
- if (oServerAdmin.existsDatabase(dbName, "plocal")) {
- oServerAdmin.dropDatabase(dbName, "plocal")
- }
- oServerAdmin.close()
-
client.close()
oDatabase.close()
+ if (orientDB.exists(dbName)) {
+ orientDB.drop(dbName)
+ }
+ orientDB.close()
TestKit.shutdownActorSystem(system)
}
@@ -125,7 +124,7 @@ class OrientDbSpec extends AnyWordSpec with Matchers with
BeforeAndAfterAll with
val oDocument = new ODocument()
.field(fieldName, fieldValue)
oDocument.setClassNameIfExists(className)
- oDocument.save()
+ client.save(oDocument)
}
private def unregister(className: String): Unit =
@@ -232,9 +231,9 @@ class OrientDbSpec extends AnyWordSpec with Matchers with
BeforeAndAfterAll with
val streamCompletion: Future[Done] = OrientDbSource
.typed(sourceClass, OrientDbSourceSettings(oDatabase),
classOf[OrientDbTest.source1])
.map { (m: OrientDbReadResult[OrientDbTest.source1]) =>
- val db: ODatabaseDocumentTx = oDatabase.acquire
- db.setDatabaseOwner(new OObjectDatabaseTx(db))
- ODatabaseRecordThreadLocal.instance.set(db)
+ val db = oDatabase.acquire
+ new OObjectDatabaseTx(db.asInstanceOf[ODatabaseDocumentInternal])
+
ODatabaseRecordThreadLocal.instance.set(db.asInstanceOf[ODatabaseDocumentInternal])
val sink: OrientDbTest.sink2 = new OrientDbTest.sink2
sink.setBook_title(m.oDocument.getBook_title)
OrientDbWriteMessage(sink)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]