This is an automated email from the ASF dual-hosted git repository.

dimas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git


The following commit(s) were added to refs/heads/main by this push:
     new c783de931 feat(idgen): Start Implementation of NoSQL with the ID 
Generation Framework (#2131)
c783de931 is described below

commit c783de9315a8e55bc4dbda9a073dd0a64f9553c0
Author: Adam Christian 
<[email protected]>
AuthorDate: Wed Sep 3 09:27:17 2025 -0700

    feat(idgen): Start Implementation of NoSQL with the ID Generation Framework 
(#2131)
    
    Create an ID Generation Framework.
    
    Related to #650 & #844
    
    
    Co-authored-by: Robert Stupp <[email protected]>
    Co-authored-by: Dmitri Bourlatchkov <[email protected]>
---
 gradle/libs.versions.toml                          |   6 +
 gradle/projects.main.properties                    |   6 +
 persistence/nosql/idgen/README.md                  |  55 ++
 persistence/nosql/idgen/api/build.gradle.kts       |  42 ++
 .../org/apache/polaris/ids/api/IdGenerator.java    |  45 ++
 .../apache/polaris/ids/api/IdGeneratorSpec.java    |  52 ++
 .../org/apache/polaris/ids/api/MonotonicClock.java |  69 +++
 .../polaris/ids/api/SnowflakeIdGenerator.java      |  60 ++
 persistence/nosql/idgen/impl/build.gradle.kts      |  69 +++
 .../polaris/ids/impl/MonotonicClockStress.java     | 192 ++++++
 .../ids/impl/SnowflakeIdGeneratorStress.java       |  98 +++
 .../polaris/ids/impl/MonotonicClockBench.java      |  91 +++
 .../ids/impl/SnowflakeIdGeneratorBench.java        | 158 +++++
 .../polaris/ids/impl/MonotonicClockImpl.java       | 343 +++++++++++
 .../ids/impl/SnowflakeIdGeneratorFactory.java      | 106 ++++
 .../polaris/ids/impl/SnowflakeIdGeneratorImpl.java | 408 +++++++++++++
 .../impl/src/main/resources/META-INF/beans.xml     |  24 +
 .../org.apache.polaris.ids.spi.IdGeneratorFactory  |  20 +
 .../polaris/ids/impl/TestMonotonicClockImpl.java   | 345 +++++++++++
 .../ids/impl/TestSnowflakeIdGeneratorImpl.java     | 662 +++++++++++++++++++++
 .../idgen/impl/src/test/resources/logback-test.xml |  32 +
 persistence/nosql/idgen/mocks/build.gradle.kts     |  42 ++
 .../polaris/ids/mocks/MutableMonotonicClock.java   | 103 ++++
 .../mocks/src/main/resources/META-INF/beans.xml    |  24 +
 persistence/nosql/idgen/spi/build.gradle.kts       |  43 ++
 .../apache/polaris/ids/spi/IdGeneratorFactory.java |  43 ++
 .../apache/polaris/ids/spi/IdGeneratorSource.java  |  27 +
 27 files changed, 3165 insertions(+)

diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index aad5bcb5f..7becf77c6 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -24,6 +24,7 @@ hive = "3.1.3"
 iceberg = "1.9.2" # Ensure to update the iceberg version in regtests to keep 
regtests up-to-date
 quarkus = "3.25.4"
 immutables = "2.11.3"
+jmh = "1.37"
 picocli = "4.7.7"
 scala212 = "2.12.19"
 spark35 = "3.5.6"
@@ -76,6 +77,9 @@ jandex = { module = "io.smallrye.jandex:jandex", version 
="3.4.0" }
 javax-servlet-api = { module = "javax.servlet:javax.servlet-api", version = 
"4.0.1" }
 junit-bom = { module = "org.junit:junit-bom", version = "5.13.4" }
 keycloak-admin-client = { module = "org.keycloak:keycloak-admin-client", 
version = "26.0.6" }
+jcstress-core = { module = "org.openjdk.jcstress:jcstress-core", version = 
"0.16" }
+jmh-core = { module = "org.openjdk.jmh:jmh-core", version.ref = "jmh" }
+jmh-generator-annprocess = { module = 
"org.openjdk.jmh:jmh-generator-annprocess", version.ref = "jmh" }
 logback-classic = { module = "ch.qos.logback:logback-classic", version = 
"1.5.18" }
 micrometer-bom = { module = "io.micrometer:micrometer-bom", version = "1.15.3" 
}
 microprofile-fault-tolerance-api = { module = 
"org.eclipse.microprofile.fault-tolerance:microprofile-fault-tolerance-api", 
version = "4.1.2" }
@@ -102,6 +106,8 @@ testcontainers-keycloak = { module = 
"com.github.dasniko:testcontainers-keycloak
 threeten-extra = { module = "org.threeten:threeten-extra", version = "1.8.0" }
 
 [plugins]
+jcstress = { id = "io.github.reyerizo.gradle.jcstress", version = "0.8.15" }
+jmh = { id = "me.champeau.jmh", version = "0.7.3" }
 openapi-generator = { id = "org.openapi.generator", version = "7.12.0" }
 quarkus = { id = "io.quarkus", version.ref = "quarkus" }
 rat = { id = "org.nosphere.apache.rat", version = "0.8.1" }
diff --git a/gradle/projects.main.properties b/gradle/projects.main.properties
index 27fdae355..ce9455948 100644
--- a/gradle/projects.main.properties
+++ b/gradle/projects.main.properties
@@ -48,3 +48,9 @@ polaris-extensions-federation-hive=extensions/federation/hive
 polaris-config-docs-annotations=tools/config-docs/annotations
 polaris-config-docs-generator=tools/config-docs/generator
 polaris-config-docs-site=tools/config-docs/site
+
+# id generation
+polaris-idgen-api=persistence/nosql/idgen/api
+polaris-idgen-impl=persistence/nosql/idgen/impl
+polaris-idgen-mocks=persistence/nosql/idgen/mocks
+polaris-idgen-spi=persistence/nosql/idgen/spi
diff --git a/persistence/nosql/idgen/README.md 
b/persistence/nosql/idgen/README.md
new file mode 100644
index 000000000..576a983cf
--- /dev/null
+++ b/persistence/nosql/idgen/README.md
@@ -0,0 +1,55 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+ 
+   http://www.apache.org/licenses/LICENSE-2.0
+ 
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+
+# Unique ID generation framework and monotonic clock
+
+Provides a framework and implementations for unique ID generation, including a 
monotonically increasing timestamp/clock
+source.
+
+Provides a
+[Snowflake-IDs](https://medium.com/@jitenderkmr/demystifying-snowflake-ids-a-unique-identifier-in-distributed-computing-72796a827c9d)
+implementation.
+
+Consuming production should primarily leverage the `IdGenerator` and 
`MonotonicClock` interfaces.
+
+## Snowflake ID source
+
+The Snowflake ID source is configurable for each backend instance, but cannot 
be modified for an existing backend
+instance to prevent ID conflicts.
+
+The epoch of these timestamps is 2025-03-01-00:00:00.0 GMT. Timestamps occupy 
41 bits at
+millisecond precision, which lasts for about 69 years. Node-IDs are 10 bits, 
which allows 1024 concurrently active
+"JVMs running Polaris". 12 bits are used by the sequence number, which then 
allows each node to generate 4096 IDs per
+millisecond. One bit is reserved for future use.
+
+Node IDs are leased by every "JVM running Polaris" for a period of time. The 
ID generator implementation guarantees
+that no IDs will be generated for a timestamp that exceeds the "lease time". 
Leases can be extended. The implementation
+leverages atomic database operations (CAS) for the lease implementation.
+
+ID generators must not use timestamps before or after the lease period nor 
must they re-use an older timestamp. This
+requirement is satisfied using a monotonic clock implementation.
+
+## Code structure
+
+The code is structured into multiple modules. Consuming code should almost 
always pull in only the API module.
+
+* `polaris-idgen-api` provides the necessary Java interfaces and immutable 
types.
+* `polaris-idgen-impl` provides the storage agnostic implementation.
+* `polaris-idgen-mocks` provides mocks for testing.
+* `polaris-idgen-spi` provides the necessary interfaces to construct ID 
generators.
diff --git a/persistence/nosql/idgen/api/build.gradle.kts 
b/persistence/nosql/idgen/api/build.gradle.kts
new file mode 100644
index 000000000..91f02365a
--- /dev/null
+++ b/persistence/nosql/idgen/api/build.gradle.kts
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+plugins {
+  id("org.kordamp.gradle.jandex")
+  id("polaris-server")
+}
+
+description = "Polaris ID generation API"
+
+dependencies {
+  compileOnly(libs.jakarta.annotation.api)
+  compileOnly(libs.jakarta.validation.api)
+  compileOnly(libs.jakarta.inject.api)
+  compileOnly(libs.jakarta.enterprise.cdi.api)
+
+  compileOnly(libs.smallrye.config.core)
+  compileOnly(platform(libs.quarkus.bom))
+  compileOnly("io.quarkus:quarkus-core")
+
+  compileOnly(project(":polaris-immutables"))
+  annotationProcessor(project(":polaris-immutables", configuration = 
"processor"))
+
+  implementation(platform(libs.jackson.bom))
+  implementation("com.fasterxml.jackson.core:jackson-databind")
+}
diff --git 
a/persistence/nosql/idgen/api/src/main/java/org/apache/polaris/ids/api/IdGenerator.java
 
b/persistence/nosql/idgen/api/src/main/java/org/apache/polaris/ids/api/IdGenerator.java
new file mode 100644
index 000000000..2bc7af8ef
--- /dev/null
+++ 
b/persistence/nosql/idgen/api/src/main/java/org/apache/polaris/ids/api/IdGenerator.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.ids.api;
+
+/** The primary interface for generating a contention-free ID. */
+public interface IdGenerator {
+  /** Generate a new, unique ID. */
+  long generateId();
+
+  /** Generate the system ID for a node, solely used for node management. */
+  long systemIdForNode(int nodeId);
+
+  default String describeId(long id) {
+    return Long.toString(id);
+  }
+
+  IdGenerator NONE =
+      new IdGenerator() {
+        @Override
+        public long generateId() {
+          throw new UnsupportedOperationException("NONE IdGenerator cannot 
generate IDs.");
+        }
+
+        @Override
+        public long systemIdForNode(int nodeId) {
+          throw new UnsupportedOperationException("NONE IdGenerator cannot 
generate IDs.");
+        }
+      };
+}
diff --git 
a/persistence/nosql/idgen/api/src/main/java/org/apache/polaris/ids/api/IdGeneratorSpec.java
 
b/persistence/nosql/idgen/api/src/main/java/org/apache/polaris/ids/api/IdGeneratorSpec.java
new file mode 100644
index 000000000..b1d304c99
--- /dev/null
+++ 
b/persistence/nosql/idgen/api/src/main/java/org/apache/polaris/ids/api/IdGeneratorSpec.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.ids.api;
+
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import io.smallrye.config.WithDefault;
+import java.util.Map;
+import org.apache.polaris.immutables.PolarisImmutable;
+import org.immutables.value.Value;
+
+@PolarisImmutable
+@JsonSerialize(as = ImmutableIdGeneratorSpec.class)
+@JsonDeserialize(as = ImmutableIdGeneratorSpec.class)
+public interface IdGeneratorSpec {
+  @WithDefault("snowflake")
+  String type();
+
+  Map<String, String> params();
+
+  @PolarisImmutable
+  interface BuildableIdGeneratorSpec extends IdGeneratorSpec {
+    static ImmutableBuildableIdGeneratorSpec.Builder builder() {
+      return ImmutableBuildableIdGeneratorSpec.builder();
+    }
+
+    @Override
+    Map<String, String> params();
+
+    @Override
+    @Value.Default
+    default String type() {
+      return "snowflake";
+    }
+  }
+}
diff --git 
a/persistence/nosql/idgen/api/src/main/java/org/apache/polaris/ids/api/MonotonicClock.java
 
b/persistence/nosql/idgen/api/src/main/java/org/apache/polaris/ids/api/MonotonicClock.java
new file mode 100644
index 000000000..2bb6fa161
--- /dev/null
+++ 
b/persistence/nosql/idgen/api/src/main/java/org/apache/polaris/ids/api/MonotonicClock.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.ids.api;
+
+import java.time.Instant;
+
+/**
+ * Provides a clock providing the current time in milliseconds, microseconds 
and instant since
+ * 1970-01-01-00:00:00.000. The returned timestamp values increase 
monotonically.
+ *
+ * <p>The functions provide nanosecond/microsecond/millisecond precision, but 
not necessarily the
+ * same resolution (how frequently the value changes) - no guarantees are made.
+ *
+ * <p>Implementation <em>may</em> adjust to wall clocks advancing faster than 
the real time. If and
+ * how exactly depends on the implementation, as long as none of the time 
values available via this
+ * interface "goes backwards".
+ *
+ * <p>Implementer notes: {@link System#nanoTime() System.nanoTime()} does not 
guarantee that the
+ * values will be monotonically increasing when invocations happen from 
different
+ * CPUs/cores/threads.
+ *
+ * <p>A default implementation of {@link MonotonicClock} can be injected as an 
application scoped
+ * bean in CDI.
+ */
+public interface MonotonicClock extends AutoCloseable {
+  /**
+   * Current timestamp as microseconds since epoch, can be used as a 
monotonically increasing wall
+   * clock.
+   */
+  long currentTimeMicros();
+
+  /**
+   * Current timestamp as milliseconds since epoch, can be used as a 
monotonically increasing wall
+   * clock.
+   */
+  long currentTimeMillis();
+
+  /**
+   * Current instant with nanosecond precision, can be used as a monotonically 
increasing wall
+   * clock.
+   */
+  Instant currentInstant();
+
+  /** Monotonically increasing timestamp with nanosecond precision, not 
related to wall clock. */
+  long nanoTime();
+
+  void sleepMillis(long millis);
+
+  @Override
+  void close();
+
+  void waitUntilTimeMillisAdvanced();
+}
diff --git 
a/persistence/nosql/idgen/api/src/main/java/org/apache/polaris/ids/api/SnowflakeIdGenerator.java
 
b/persistence/nosql/idgen/api/src/main/java/org/apache/polaris/ids/api/SnowflakeIdGenerator.java
new file mode 100644
index 000000000..a0b9b8722
--- /dev/null
+++ 
b/persistence/nosql/idgen/api/src/main/java/org/apache/polaris/ids/api/SnowflakeIdGenerator.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.ids.api;
+
+import jakarta.annotation.Nonnull;
+import java.time.Instant;
+import java.util.UUID;
+
+public interface SnowflakeIdGenerator extends IdGenerator {
+  /** Offset of the snowflake ID generator since the 1970-01-01T00:00:00Z 
epoch instant. */
+  Instant ID_EPOCH = Instant.parse("2025-03-01T00:00:00Z");
+
+  /**
+   * Offset of the snowflake ID generator in milliseconds since the 
1970-01-01T00:00:00Z epoch
+   * instant.
+   */
+  long ID_EPOCH_MILLIS = ID_EPOCH.toEpochMilli();
+
+  int DEFAULT_NODE_ID_BITS = 10;
+  int DEFAULT_TIMESTAMP_BITS = 41;
+  int DEFAULT_SEQUENCE_BITS = 12;
+
+  long constructId(long timestamp, long sequence, long node);
+
+  long timestampFromId(long id);
+
+  long timestampUtcFromId(long id);
+
+  long sequenceFromId(long id);
+
+  long nodeFromId(long id);
+
+  UUID idToTimeUuid(long id);
+
+  String idToString(long id);
+
+  long timeUuidToId(@Nonnull UUID uuid);
+
+  int timestampBits();
+
+  int sequenceBits();
+
+  int nodeIdBits();
+}
diff --git a/persistence/nosql/idgen/impl/build.gradle.kts 
b/persistence/nosql/idgen/impl/build.gradle.kts
new file mode 100644
index 000000000..abf245fed
--- /dev/null
+++ b/persistence/nosql/idgen/impl/build.gradle.kts
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+plugins {
+  id("org.kordamp.gradle.jandex")
+  alias(libs.plugins.jmh)
+  alias(libs.plugins.jcstress)
+  id("polaris-server")
+}
+
+description = "Polaris ID generation implementation"
+
+dependencies {
+  implementation(project(":polaris-idgen-api"))
+  implementation(project(":polaris-idgen-spi"))
+
+  implementation(libs.guava)
+  implementation(libs.slf4j.api)
+
+  compileOnly(libs.jakarta.annotation.api)
+  compileOnly(libs.jakarta.validation.api)
+  compileOnly(libs.jakarta.inject.api)
+  compileOnly(libs.jakarta.enterprise.cdi.api)
+
+  compileOnly(libs.smallrye.config.core)
+  compileOnly(platform(libs.quarkus.bom))
+  compileOnly("io.quarkus:quarkus-core")
+
+  compileOnly(project(":polaris-immutables"))
+  annotationProcessor(project(":polaris-immutables", configuration = 
"processor"))
+
+  implementation(platform(libs.jackson.bom))
+  implementation("com.fasterxml.jackson.core:jackson-databind")
+
+  testFixturesCompileOnly(platform(libs.jackson.bom))
+  testFixturesCompileOnly("com.fasterxml.jackson.core:jackson-databind")
+
+  testFixturesCompileOnly(libs.jakarta.inject.api)
+  testFixturesCompileOnly(libs.jakarta.enterprise.cdi.api)
+
+  testImplementation(project(":polaris-idgen-mocks"))
+
+  jmhImplementation(libs.jmh.core)
+  jmhAnnotationProcessor(libs.jmh.generator.annprocess)
+}
+
+tasks.named("jcstressJar") { dependsOn("jandex") }
+
+tasks.named("compileJcstressJava") { dependsOn("jandex") }
+
+tasks.named("check") { dependsOn("jcstress") }
+
+jcstress { jcstressDependency = libs.jcstress.core.get().toString() }
diff --git 
a/persistence/nosql/idgen/impl/src/jcstress/java/org/apache/polaris/ids/impl/MonotonicClockStress.java
 
b/persistence/nosql/idgen/impl/src/jcstress/java/org/apache/polaris/ids/impl/MonotonicClockStress.java
new file mode 100644
index 000000000..75b8e745f
--- /dev/null
+++ 
b/persistence/nosql/idgen/impl/src/jcstress/java/org/apache/polaris/ids/impl/MonotonicClockStress.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.ids.impl;
+
+import static org.openjdk.jcstress.annotations.Expect.ACCEPTABLE;
+import static org.openjdk.jcstress.annotations.Expect.FORBIDDEN;
+import static org.openjdk.jcstress.annotations.Expect.UNKNOWN;
+
+import java.time.Instant;
+import org.openjdk.jcstress.annotations.Actor;
+import org.openjdk.jcstress.annotations.Arbiter;
+import org.openjdk.jcstress.annotations.Description;
+import org.openjdk.jcstress.annotations.JCStressTest;
+import org.openjdk.jcstress.annotations.Outcome;
+import org.openjdk.jcstress.annotations.State;
+import org.openjdk.jcstress.infra.results.II_Result;
+
+public class MonotonicClockStress {
+  public static final MonotonicClockImpl CLOCK = new 
MonotonicClockImpl().start();
+
+  @JCStressTest
+  @Description("Verify that monotonicity is guaranteed across different 
threads (nanos).")
+  @Outcome.Outcomes({
+    @Outcome(id = "1, 1", expect = ACCEPTABLE, desc = "Both newer"),
+    @Outcome(id = "1, 0", expect = ACCEPTABLE, desc = "Newer + same time"),
+    @Outcome(id = "0, 1", expect = ACCEPTABLE, desc = "Same time + newer"),
+    @Outcome(id = "0, 0", expect = ACCEPTABLE, desc = "Both same time"),
+    @Outcome(id = "-1, .*", expect = FORBIDDEN, desc = "Clock must not go 
backwards"),
+    @Outcome(id = ".*, -1", expect = FORBIDDEN, desc = "Clock must not go 
backwards"),
+    @Outcome(expect = UNKNOWN, desc = "Not sure what happened"),
+  })
+  @State()
+  public static class Nanos {
+    long ref;
+
+    long v1;
+    long v2;
+
+    public Nanos() {
+      ref = CLOCK.nanoTime();
+    }
+
+    @Actor
+    public void actor1() {
+      v1 = CLOCK.nanoTime();
+    }
+
+    @Actor
+    public void actor2() {
+      v2 = CLOCK.nanoTime();
+    }
+
+    @Arbiter
+    public void arbiter(II_Result r) {
+      r.r1 = Long.compare(v1, ref);
+      r.r2 = Long.compare(v2, ref);
+    }
+  }
+
+  @JCStressTest
+  @Description("Verify that monotonicity is guaranteed across different 
threads (micros).")
+  @Outcome.Outcomes({
+    @Outcome(id = "1, 1", expect = ACCEPTABLE, desc = "Both newer"),
+    @Outcome(id = "1, 0", expect = ACCEPTABLE, desc = "Newer + same time"),
+    @Outcome(id = "0, 1", expect = ACCEPTABLE, desc = "Same time + newer"),
+    @Outcome(id = "0, 0", expect = ACCEPTABLE, desc = "Both same time"),
+    @Outcome(id = "-1, .*", expect = FORBIDDEN, desc = "Clock must not go 
backwards"),
+    @Outcome(id = ".*, -1", expect = FORBIDDEN, desc = "Clock must not go 
backwards"),
+    @Outcome(expect = UNKNOWN, desc = "Not sure what happened"),
+  })
+  @State()
+  public static class Micros {
+    long ref;
+
+    long v1;
+    long v2;
+
+    public Micros() {
+      ref = CLOCK.currentTimeMicros();
+    }
+
+    @Actor
+    public void actor1() {
+      v1 = CLOCK.currentTimeMicros();
+    }
+
+    @Actor
+    public void actor2() {
+      v2 = CLOCK.currentTimeMicros();
+    }
+
+    @Arbiter
+    public void arbiter(II_Result r) {
+      r.r1 = Long.compare(v1, ref);
+      r.r2 = Long.compare(v2, ref);
+    }
+  }
+
+  @JCStressTest
+  @Description("Verify that monotonicity is guaranteed across different 
threads (millis).")
+  @Outcome.Outcomes({
+    @Outcome(id = "1, 1", expect = ACCEPTABLE, desc = "Both newer"),
+    @Outcome(id = "1, 0", expect = ACCEPTABLE, desc = "Newer + same time"),
+    @Outcome(id = "0, 1", expect = ACCEPTABLE, desc = "Same time + newer"),
+    @Outcome(id = "0, 0", expect = ACCEPTABLE, desc = "Both same time"),
+    @Outcome(id = "-1, .*", expect = FORBIDDEN, desc = "Clock must not go 
backwards"),
+    @Outcome(id = ".*, -1", expect = FORBIDDEN, desc = "Clock must not go 
backwards"),
+    @Outcome(expect = UNKNOWN, desc = "Not sure what happened"),
+  })
+  @State()
+  public static class Millis {
+    long ref;
+
+    long v1;
+    long v2;
+
+    public Millis() {
+      ref = CLOCK.currentTimeMillis();
+    }
+
+    @Actor
+    public void actor1() {
+      v1 = CLOCK.currentTimeMillis();
+    }
+
+    @Actor
+    public void actor2() {
+      v2 = CLOCK.currentTimeMillis();
+    }
+
+    @Arbiter
+    public void arbiter(II_Result r) {
+      r.r1 = Long.compare(v1, ref);
+      r.r2 = Long.compare(v2, ref);
+    }
+  }
+
+  @JCStressTest
+  @Description("Verify that monotonicity is guaranteed across different 
threads (instants).")
+  @Outcome.Outcomes({
+    @Outcome(id = "1, 1", expect = ACCEPTABLE, desc = "Both newer"),
+    @Outcome(id = "1, 0", expect = ACCEPTABLE, desc = "Newer + same time"),
+    @Outcome(id = "0, 1", expect = ACCEPTABLE, desc = "Same time + newer"),
+    @Outcome(id = "0, 0", expect = ACCEPTABLE, desc = "Both same time"),
+    @Outcome(id = "-1, .*", expect = FORBIDDEN, desc = "Clock must not go 
backwards"),
+    @Outcome(id = ".*, -1", expect = FORBIDDEN, desc = "Clock must not go 
backwards"),
+    @Outcome(expect = UNKNOWN, desc = "Not sure what happened"),
+  })
+  @State()
+  public static class Instants {
+    Instant ref;
+
+    Instant v1;
+    Instant v2;
+
+    public Instants() {
+      ref = CLOCK.currentInstant();
+    }
+
+    @Actor
+    public void actor1() {
+      v1 = CLOCK.currentInstant();
+    }
+
+    @Actor
+    public void actor2() {
+      v2 = CLOCK.currentInstant();
+    }
+
+    @Arbiter
+    public void arbiter(II_Result r) {
+      r.r1 = Integer.compare(v1.compareTo(ref), 0);
+      r.r2 = Integer.compare(v2.compareTo(ref), 0);
+    }
+  }
+}
diff --git 
a/persistence/nosql/idgen/impl/src/jcstress/java/org/apache/polaris/ids/impl/SnowflakeIdGeneratorStress.java
 
b/persistence/nosql/idgen/impl/src/jcstress/java/org/apache/polaris/ids/impl/SnowflakeIdGeneratorStress.java
new file mode 100644
index 000000000..8313e38cd
--- /dev/null
+++ 
b/persistence/nosql/idgen/impl/src/jcstress/java/org/apache/polaris/ids/impl/SnowflakeIdGeneratorStress.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.ids.impl;
+
+import static org.openjdk.jcstress.annotations.Expect.ACCEPTABLE;
+import static org.openjdk.jcstress.annotations.Expect.FORBIDDEN;
+import static org.openjdk.jcstress.annotations.Expect.UNKNOWN;
+
+import org.apache.polaris.ids.api.IdGenerator;
+import org.apache.polaris.ids.spi.IdGeneratorSource;
+import org.openjdk.jcstress.annotations.Actor;
+import org.openjdk.jcstress.annotations.Arbiter;
+import org.openjdk.jcstress.annotations.Description;
+import org.openjdk.jcstress.annotations.JCStressTest;
+import org.openjdk.jcstress.annotations.Outcome;
+import org.openjdk.jcstress.annotations.State;
+import org.openjdk.jcstress.infra.results.I_Result;
+import org.openjdk.jcstress.infra.results.Z_Result;
+
+public class SnowflakeIdGeneratorStress {
+  public static final MonotonicClockImpl CLOCK = new 
MonotonicClockImpl().start();
+  public static final IdGenerator IDGEN =
+      new SnowflakeIdGeneratorImpl(
+          new IdGeneratorSource() {
+            @Override
+            public int nodeId() {
+              return 42;
+            }
+
+            @Override
+            public long currentTimeMillis() {
+              return CLOCK.currentTimeMillis();
+            }
+          });
+
+  @JCStressTest
+  @Description("Verify that generated IDs are unique for the same thread.")
+  @Outcome.Outcomes({
+    @Outcome(id = "1", expect = ACCEPTABLE, desc = "Not equal, greater"),
+    @Outcome(id = "-1", expect = FORBIDDEN, desc = "Not equal, smaller"),
+    @Outcome(id = "0", expect = FORBIDDEN, desc = "Equal"),
+    @Outcome(expect = UNKNOWN, desc = "Not sure what happened"),
+  })
+  @State()
+  public static class SameThread {
+    @Actor
+    public void actor(I_Result r) {
+      var v1 = IDGEN.generateId();
+      var v2 = IDGEN.generateId();
+
+      r.r1 = Long.compare(v2, v1);
+    }
+  }
+
+  @JCStressTest
+  @Description("Verify that generated IDs are unique for the same thread.")
+  @Outcome.Outcomes({
+    @Outcome(id = "false", expect = ACCEPTABLE, desc = "Not equal"),
+    @Outcome(id = "true", expect = FORBIDDEN, desc = "Equal"),
+    @Outcome(expect = UNKNOWN, desc = "Not sure what happened"),
+  })
+  @State()
+  public static class DifferentThreads {
+    long v1;
+    long v2;
+
+    @Actor
+    public void actor1() {
+      v1 = IDGEN.generateId();
+    }
+
+    @Actor
+    public void actor2() {
+      v1 = IDGEN.generateId();
+    }
+
+    @Arbiter
+    public void arbiter(Z_Result r) {
+      r.r1 = v1 == v2;
+    }
+  }
+}
diff --git 
a/persistence/nosql/idgen/impl/src/jmh/java/org/apache/polaris/ids/impl/MonotonicClockBench.java
 
b/persistence/nosql/idgen/impl/src/jmh/java/org/apache/polaris/ids/impl/MonotonicClockBench.java
new file mode 100644
index 000000000..5575cfe82
--- /dev/null
+++ 
b/persistence/nosql/idgen/impl/src/jmh/java/org/apache/polaris/ids/impl/MonotonicClockBench.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.ids.impl;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+import java.time.Instant;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+@Warmup(iterations = 3, time = 1000, timeUnit = MILLISECONDS)
+@Measurement(iterations = 6, time = 1000, timeUnit = MILLISECONDS)
+@Fork(1)
+@Threads(4)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(NANOSECONDS)
+public class MonotonicClockBench {
+  @State(Scope.Benchmark)
+  public static class BenchmarkParam {
+    MonotonicClockImpl monotonicClock;
+    MonotonicClockImpl monotonicClockIdle;
+
+    @Setup
+    public void init() {
+      monotonicClock = new MonotonicClockImpl().start();
+      monotonicClockIdle = new MonotonicClockImpl();
+    }
+  }
+
+  @Threads(1)
+  @Benchmark
+  public void tick(BenchmarkParam param) {
+    param.monotonicClockIdle.tick();
+  }
+
+  @Benchmark
+  public long nanoTime(BenchmarkParam param) {
+    return param.monotonicClock.nanoTime();
+  }
+
+  @Benchmark
+  public long currentTimeMicros(BenchmarkParam param) {
+    return param.monotonicClock.currentTimeMicros();
+  }
+
+  @Benchmark
+  public long currentTimeMillis(BenchmarkParam param) {
+    return param.monotonicClock.currentTimeMillis();
+  }
+
+  @Benchmark
+  public Instant currentInstant(BenchmarkParam param) {
+    return param.monotonicClock.currentInstant();
+  }
+
+  @Benchmark
+  public long systemCurrentTimeMillis(BenchmarkParam param) {
+    return param.monotonicClock.systemCurrentTimeMillis();
+  }
+
+  @Benchmark
+  public long systemNanoTime(BenchmarkParam param) {
+    return param.monotonicClock.systemNanoTime();
+  }
+}
diff --git 
a/persistence/nosql/idgen/impl/src/jmh/java/org/apache/polaris/ids/impl/SnowflakeIdGeneratorBench.java
 
b/persistence/nosql/idgen/impl/src/jmh/java/org/apache/polaris/ids/impl/SnowflakeIdGeneratorBench.java
new file mode 100644
index 000000000..de0f79089
--- /dev/null
+++ 
b/persistence/nosql/idgen/impl/src/jmh/java/org/apache/polaris/ids/impl/SnowflakeIdGeneratorBench.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.ids.impl;
+
+import static java.util.concurrent.TimeUnit.MICROSECONDS;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static 
org.apache.polaris.ids.api.SnowflakeIdGenerator.DEFAULT_NODE_ID_BITS;
+import static 
org.apache.polaris.ids.api.SnowflakeIdGenerator.DEFAULT_SEQUENCE_BITS;
+import static 
org.apache.polaris.ids.api.SnowflakeIdGenerator.DEFAULT_TIMESTAMP_BITS;
+
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.polaris.ids.api.MonotonicClock;
+import org.apache.polaris.ids.api.SnowflakeIdGenerator;
+import org.apache.polaris.ids.spi.IdGeneratorSource;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+@Warmup(iterations = 5, time = 1000, timeUnit = MILLISECONDS)
+@Measurement(iterations = 5, time = 1000, timeUnit = MILLISECONDS)
+@Fork(1)
+@Threads(4)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(NANOSECONDS)
+public class SnowflakeIdGeneratorBench {
+  @State(Scope.Benchmark)
+  public static class BenchmarkParam {
+    SnowflakeIdGenerator idGeneratorMonotonicClock;
+    SnowflakeIdGenerator idGeneratorMonotonicClockHugeSequence;
+    SnowflakeIdGenerator idGeneratorFakeClock;
+    MonotonicClock monotonicClock;
+
+    @Setup
+    public void init() {
+      monotonicClock = MonotonicClockImpl.newDefaultInstance();
+
+      var idGeneratorSource =
+          new IdGeneratorSource() {
+            @Override
+            public int nodeId() {
+              return 1;
+            }
+
+            @Override
+            public long currentTimeMillis() {
+              return monotonicClock.currentTimeMillis();
+            }
+          };
+      idGeneratorMonotonicClock =
+          new SnowflakeIdGeneratorFactory()
+              .buildIdGenerator(
+                  DEFAULT_TIMESTAMP_BITS,
+                  DEFAULT_SEQUENCE_BITS,
+                  DEFAULT_NODE_ID_BITS,
+                  monotonicClock.currentTimeMillis(),
+                  idGeneratorSource);
+
+      idGeneratorMonotonicClockHugeSequence =
+          new SnowflakeIdGeneratorFactory()
+              .buildIdGenerator(
+                  DEFAULT_TIMESTAMP_BITS - 15,
+                  DEFAULT_SEQUENCE_BITS + 23,
+                  DEFAULT_NODE_ID_BITS - 8,
+                  monotonicClock.currentTimeMillis(),
+                  idGeneratorSource);
+
+      var off = System.currentTimeMillis();
+      var fakeClock = new AtomicLong(off);
+      idGeneratorFakeClock =
+          new SnowflakeIdGeneratorImpl(
+              DEFAULT_TIMESTAMP_BITS,
+              DEFAULT_SEQUENCE_BITS,
+              DEFAULT_NODE_ID_BITS,
+              off,
+              new IdGeneratorSource() {
+                @Override
+                public int nodeId() {
+                  return 1;
+                }
+
+                @Override
+                public long currentTimeMillis() {
+                  return fakeClock.get();
+                }
+              }) {
+            @Override
+            void spinWaitSequence() {
+              fakeClock.incrementAndGet();
+            }
+          };
+    }
+  }
+
+  /**
+   * <em>WARNING</em>: This {@code generateIdMonotonicSource} benchmark relies 
indirectly on a real
+   * system clock via {@link MonotonicClockImpl} and is therefore not only 
slower because of hitting
+   * the OS clock but mostly because it spin-waits due to too many IDs are 
generated per
+   * millisecond. In other words: the times yielded by JMH <em>MUST NOT</em> 
be considered as
+   * runtimes in production, because it practically never happens that more 
than 4096 IDs are
+   * generated per millisecond.
+   */
+  @Benchmark
+  public long generateIdMonotonicSourceSpinning(BenchmarkParam param) {
+    return param.idGeneratorMonotonicClock.generateId();
+  }
+
+  /**
+   * Snowflake ID generation against a generator configured with an extremely 
high number of
+   * sequence-bits for the sole purpose of benchmarking ID generation with an 
extremely low chance
+   * of spinning.
+   */
+  @Benchmark
+  public long generateIdMonotonicSourceHugeSequence(BenchmarkParam param) {
+    return param.idGeneratorMonotonicClockHugeSequence.generateId();
+  }
+
+  /** */
+  @Benchmark
+  @Measurement(iterations = 50, time = 900, timeUnit = MICROSECONDS)
+  @Threads(1)
+  public long generateIdMonotonicSourceNotSpinning(BenchmarkParam param) {
+    return param.idGeneratorMonotonicClock.generateId();
+  }
+
+  /**
+   * Artificial benchmark to just measure the ID generator without spinning 
(waiting for the "next"
+   * millisecond).
+   */
+  @Benchmark
+  public long generateIdFakeClockSource(BenchmarkParam param) {
+    return param.idGeneratorFakeClock.generateId();
+  }
+}
diff --git 
a/persistence/nosql/idgen/impl/src/main/java/org/apache/polaris/ids/impl/MonotonicClockImpl.java
 
b/persistence/nosql/idgen/impl/src/main/java/org/apache/polaris/ids/impl/MonotonicClockImpl.java
new file mode 100644
index 000000000..786dc095e
--- /dev/null
+++ 
b/persistence/nosql/idgen/impl/src/main/java/org/apache/polaris/ids/impl/MonotonicClockImpl.java
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.ids.impl;
+
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import com.google.common.annotations.VisibleForTesting;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import org.apache.polaris.ids.api.MonotonicClock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Monotonic clock implementation that leverages {@link System#nanoTime()} as 
the primary
+ * monotonically increasing time source, provided via {@link #nanoTime()}. 
{@link
+ * System#currentTimeMillis()} is used to provide a monotonically increasing 
wall clock provided via
+ * {@link #currentInstant()}, {@link #currentTimeMicros()} and {@link 
#currentTimeMillis()}.
+ *
+ * <p>The implementation starts a single "tick-thread" polling the wall clock 
to calculate the
+ * adjustment that is necessary to provide the values for {@code 
currentTime*()}.
+ *
+ * <p>Serving the current instant or "current time micros" however is a bit 
more complex, as the
+ * wall-clock source only has millisecond precision, but the instant has 
nanosecond precision. This
+ * means that the value returned for "current instant" needs to be created 
from the "nanosecond
+ * time" and involving an "adjustment" value. That "adjustment" is also 
updated by the tick-thread
+ * and represents the difference of the current nano-time and system wall 
clock, considering the
+ * fact that the system wall clock can go backwards or forwards or not being 
updated every
+ * millisecond.
+ *
+ * <p>This implementation expects that the wall clock in nanoseconds since 
epoch can be represented
+ * by the values in the range {@code 0 .. Long.MAX_VALUE}. This implementation 
<em>must</em> be
+ * adapted approaching the year 2262 (approx 292 years fit into this range).
+ *
+ * <p>Regarding "short-time" Thread.sleep() be aware of <a
+ * href="https://bugs.openjdk.org/browse/JDK-8306463";>JDK-8306463</a> and <a
+ * href="https://bugs.openjdk.org/browse/JDK-8305092";>JDK-8305092</a>.
+ *
+ * <p>Even with very minimal sleep durations, the actual sleep time depends on 
the OS and in
+ * particular its scheduler. Sleep times have been measured to vary between 
some microseconds up to
+ * 2ms.
+ */
+@ApplicationScoped
+@VisibleForTesting
+@SuppressWarnings("FieldCanBeStatic")
+public class MonotonicClockImpl implements MonotonicClock {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(MonotonicClockImpl.class);
+
+  private volatile boolean stopTicker;
+  private volatile CountDownLatch tickerThreadLatch;
+
+  // TODO should the implementation only advance the wall clock gradually?
+  // TODO protect against accidental huge wall clock advances?
+  // TODO should the implementation maybe never adjust to an advanced 
wall-clock? (i.e. faster wall
+  //  clock than real time clock)
+
+  // Best-effort to have the volatile fields not in the same cache line as the 
object header
+
+  @SuppressWarnings("unused")
+  private final long _pad_1_0 = 0L;
+
+  @SuppressWarnings("unused")
+  private final long _pad_1_1 = 0L;
+
+  @SuppressWarnings("unused")
+  private final long _pad_1_2 = 0L;
+
+  @SuppressWarnings("unused")
+  private final long _pad_1_3 = 0L;
+
+  @SuppressWarnings("unused")
+  private final long _pad_1_4 = 0L;
+
+  @SuppressWarnings("unused")
+  private final long _pad_1_5 = 0L;
+
+  @SuppressWarnings("unused")
+  private final long _pad_1_6 = 0L;
+
+  @SuppressWarnings("unused")
+  private final long _pad_1_7 = 0L;
+
+  private volatile long adjustToWallClockAsNanos;
+
+  @SuppressWarnings("unused")
+  private final long _pad_2_0 = 0L;
+
+  @SuppressWarnings("unused")
+  private final long _pad_2_1 = 0L;
+
+  @SuppressWarnings("unused")
+  private final long _pad_2_2 = 0L;
+
+  @SuppressWarnings("unused")
+  private final long _pad_2_3 = 0L;
+
+  @SuppressWarnings("unused")
+  private final long _pad_2_4 = 0L;
+
+  @SuppressWarnings("unused")
+  private final long _pad_2_5 = 0L;
+
+  @SuppressWarnings("unused")
+  private final long _pad_2_6 = 0L;
+
+  @SuppressWarnings("unused")
+  private final long _pad_2_7 = 0L;
+
+  private volatile long previousSystemNanoTime;
+  private static final AtomicLongFieldUpdater<MonotonicClockImpl>
+      PREVIOUS_SYSTEM_NANO_TIME_UPDATER =
+          AtomicLongFieldUpdater.newUpdater(MonotonicClockImpl.class, 
"previousSystemNanoTime");
+
+  @SuppressWarnings("resource")
+  @VisibleForTesting
+  public static MonotonicClock newDefaultInstance() {
+    return new MonotonicClockImpl().start();
+  }
+
+  // for CDI
+
+  /** Default "production" constructor. */
+  MonotonicClockImpl() {
+    setup();
+  }
+
+  protected void setup() {
+    var nowNanos = systemNanoTime();
+    PREVIOUS_SYSTEM_NANO_TIME_UPDATER.set(this, nowNanos);
+
+    var nowWallClockAsMillis = systemCurrentTimeMillis();
+
+    this.adjustToWallClockAsNanos = MILLISECONDS.toNanos(nowWallClockAsMillis) 
- nowNanos;
+  }
+
+  /** Constructor for {@code MutableMonotonicClock}. */
+  @SuppressWarnings("unused")
+  protected MonotonicClockImpl(boolean dummy) {}
+
+  long currentTimeNanos() {
+    return currentTimeNanos(monotonicSystemNanoTime());
+  }
+
+  private long currentTimeNanos(long nanos) {
+    nanos += this.adjustToWallClockAsNanos;
+    return nanos;
+  }
+
+  /** Called regularly to adjust to wall-clock drift, if the wall-clock adjust 
into the future. */
+  @VisibleForTesting
+  protected void tick() {
+    var nowNanos = monotonicSystemNanoTime();
+    var nowWallClockAsMillis = systemCurrentTimeMillis();
+
+    var expectedWallClockMillis = 
NANOSECONDS.toMillis(currentTimeNanos(nowNanos));
+    var advancedInMillis = nowWallClockAsMillis - expectedWallClockMillis;
+
+    // Only adjust if wall clock did not go backwards
+    if (advancedInMillis > 0) {
+      var adjustAsNanos = this.adjustToWallClockAsNanos;
+      this.adjustToWallClockAsNanos = adjustAsNanos + 
MILLISECONDS.toNanos(advancedInMillis);
+
+      afterAdjust();
+
+      // log a if the system wall clock adjustment is quite a sudden bump for 
a production system
+      if (advancedInMillis > 200) {
+        var l =
+            advancedInMillis > 30_000
+                ? LOGGER.atError()
+                : (advancedInMillis > 2_000 ? LOGGER.atWarn() : 
LOGGER.atInfo());
+        l.log("System wall clock adjustment advanced by {}", 
Duration.ofMillis(advancedInMillis));
+      }
+    }
+  }
+
+  @SuppressWarnings("BusyWait")
+  private void ticker() {
+    try {
+      tickerThreadLatch = new CountDownLatch(1);
+      while (!stopTicker) {
+        try {
+          Thread.sleep(50);
+        } catch (InterruptedException e) {
+          // ignore
+        }
+        tick();
+      }
+    } finally {
+      tickerThreadLatch.countDown();
+    }
+  }
+
+  @PostConstruct
+  void startForCDI() {
+    checkState(!stopTicker, "Already started");
+
+    var t = new Thread(this::ticker, "Monotonic Clock Thread");
+    t.setDaemon(true);
+    t.start();
+  }
+
+  protected MonotonicClockImpl start() {
+    startForCDI();
+    return this;
+  }
+
+  @SuppressWarnings("ResultOfMethodCallIgnored")
+  @Override
+  @PreDestroy
+  public void close() {
+    stopTicker = true;
+    var t = tickerThreadLatch;
+    if (t != null) {
+      try {
+        t.await(1, MINUTES);
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      } finally {
+        tickerThreadLatch = null;
+      }
+    }
+  }
+
+  @Override
+  public long currentTimeMicros() {
+    return NANOSECONDS.toMicros(currentTimeNanos());
+  }
+
+  @Override
+  public long currentTimeMillis() {
+    return NANOSECONDS.toMillis(currentTimeNanos());
+  }
+
+  @Override
+  public Instant currentInstant() {
+    var adjustedNanos = currentTimeNanos();
+
+    var seconds = NANOSECONDS.toSeconds(adjustedNanos);
+    var nanoPart = adjustedNanos % SECONDS.toNanos(1);
+
+    return Instant.ofEpochSecond(seconds, nanoPart);
+  }
+
+  @Override
+  public long nanoTime() {
+    return monotonicSystemNanoTime();
+  }
+
+  @Override
+  public void sleepMillis(long millis) {
+    try {
+      Thread.sleep(millis);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  @Override
+  public void waitUntilTimeMillisAdvanced() {
+    var start = currentTimeMillis();
+    var now = 0L;
+    do {
+      try {
+        // The minimum interval is (at least up to Java 23 on Linux) is the 
time it takes the OS
+        // scheduler to switch tasks. That time is way higher than one 
nanosecond.
+        Thread.sleep(0, 1);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        break;
+      }
+      now = currentTimeMillis();
+    } while (start == now);
+  }
+
+  // Overridden by tests
+  @VisibleForTesting
+  protected void afterAdjust() {}
+
+  // Overridden by tests
+  @VisibleForTesting
+  protected long systemCurrentTimeMillis() {
+    return System.currentTimeMillis();
+  }
+
+  // Overridden by tests
+  @VisibleForTesting
+  protected long systemNanoTime() {
+    return System.nanoTime();
+  }
+
+  /**
+   * {@link System#nanoTime() System.nanoTime()} does not guarantee that the 
values will be
+   * monotonically increasing when invocations happen from different 
CPUs/cores.
+   *
+   * <p>This function guarantees that the returned value is always equal to or 
greater than the last
+   * returned value.
+   *
+   * <p>Adding a "simple unit test" for this function is extremely tricky, 
because every
+   * synchronization added to a test "breaks" real concurrency, which is 
however what needs to be
+   * tested.
+   */
+  private long monotonicSystemNanoTime() {
+    while (true) {
+      var nanos = systemNanoTime();
+      var last = PREVIOUS_SYSTEM_NANO_TIME_UPDATER.get(this);
+      var diff = nanos - last;
+      // Attention! 'diff' can be negative!
+      if (diff > 0L) {
+        if (PREVIOUS_SYSTEM_NANO_TIME_UPDATER.compareAndSet(this, last, 
nanos)) {
+          return nanos;
+        }
+      } else if (diff == 0L) {
+        return nanos;
+      }
+      Thread.onSpinWait();
+    }
+  }
+}
diff --git 
a/persistence/nosql/idgen/impl/src/main/java/org/apache/polaris/ids/impl/SnowflakeIdGeneratorFactory.java
 
b/persistence/nosql/idgen/impl/src/main/java/org/apache/polaris/ids/impl/SnowflakeIdGeneratorFactory.java
new file mode 100644
index 000000000..aece3a184
--- /dev/null
+++ 
b/persistence/nosql/idgen/impl/src/main/java/org/apache/polaris/ids/impl/SnowflakeIdGeneratorFactory.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.ids.impl;
+
+import static 
org.apache.polaris.ids.impl.SnowflakeIdGeneratorImpl.validateArguments;
+
+import java.time.Instant;
+import java.util.Map;
+import org.apache.polaris.ids.api.SnowflakeIdGenerator;
+import org.apache.polaris.ids.spi.IdGeneratorFactory;
+import org.apache.polaris.ids.spi.IdGeneratorSource;
+
+public class SnowflakeIdGeneratorFactory implements 
IdGeneratorFactory<SnowflakeIdGenerator> {
+  @Override
+  public void validateParameters(Map<String, String> params, IdGeneratorSource 
idGeneratorSource) {
+    int timestampBits =
+        Integer.parseInt(
+            params.getOrDefault(
+                "timestamp-bits", "" + 
SnowflakeIdGenerator.DEFAULT_TIMESTAMP_BITS));
+    int nodeIdBits =
+        Integer.parseInt(
+            params.getOrDefault("node-id-bits", "" + 
SnowflakeIdGenerator.DEFAULT_NODE_ID_BITS));
+    int sequenceBits =
+        Integer.parseInt(
+            params.getOrDefault("sequence-bits", "" + 
SnowflakeIdGenerator.DEFAULT_SEQUENCE_BITS));
+    var idEpochMillis = SnowflakeIdGenerator.ID_EPOCH_MILLIS;
+    var idEpochMillisFromParams = params.get("offset");
+    if (idEpochMillisFromParams != null) {
+      idEpochMillis = Instant.parse(idEpochMillisFromParams).toEpochMilli();
+    }
+
+    validateArguments(timestampBits, sequenceBits, nodeIdBits, idEpochMillis, 
idGeneratorSource);
+  }
+
+  @Override
+  public SnowflakeIdGenerator buildSystemIdGenerator(Map<String, String> 
params) {
+    return buildIdGenerator(
+        params,
+        new IdGeneratorSource() {
+          @Override
+          public int nodeId() {
+            return 0;
+          }
+
+          @Override
+          public long currentTimeMillis() {
+            return SnowflakeIdGenerator.ID_EPOCH_MILLIS;
+          }
+        });
+  }
+
+  @Override
+  public SnowflakeIdGenerator buildIdGenerator(
+      Map<String, String> params, IdGeneratorSource idGeneratorSource) {
+    int timestampBits =
+        Integer.parseInt(
+            params.getOrDefault(
+                "timestamp-bits", "" + 
SnowflakeIdGenerator.DEFAULT_TIMESTAMP_BITS));
+    int nodeIdBits =
+        Integer.parseInt(
+            params.getOrDefault("node-id-bits", "" + 
SnowflakeIdGenerator.DEFAULT_NODE_ID_BITS));
+    int sequenceBits =
+        Integer.parseInt(
+            params.getOrDefault("sequence-bits", "" + 
SnowflakeIdGenerator.DEFAULT_SEQUENCE_BITS));
+    // ATCFIX - This name is incorrect.
+    var idEpochMillis = SnowflakeIdGenerator.ID_EPOCH_MILLIS;
+    var offset = params.get("offset");
+    if (offset != null) {
+      idEpochMillis = Instant.parse(offset).toEpochMilli();
+    }
+
+    return buildIdGenerator(
+        timestampBits, sequenceBits, nodeIdBits, idEpochMillis, 
idGeneratorSource);
+  }
+
+  public SnowflakeIdGenerator buildIdGenerator(
+      int timestampBits,
+      int sequenceBits,
+      int nodeIdBits,
+      long offsetMillis,
+      IdGeneratorSource idGeneratorSource) {
+    return new SnowflakeIdGeneratorImpl(
+        timestampBits, sequenceBits, nodeIdBits, offsetMillis, 
idGeneratorSource);
+  }
+
+  @Override
+  public String name() {
+    return "snowflake";
+  }
+}
diff --git 
a/persistence/nosql/idgen/impl/src/main/java/org/apache/polaris/ids/impl/SnowflakeIdGeneratorImpl.java
 
b/persistence/nosql/idgen/impl/src/main/java/org/apache/polaris/ids/impl/SnowflakeIdGeneratorImpl.java
new file mode 100644
index 000000000..a2727a30d
--- /dev/null
+++ 
b/persistence/nosql/idgen/impl/src/main/java/org/apache/polaris/ids/impl/SnowflakeIdGeneratorImpl.java
@@ -0,0 +1,408 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.ids.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static java.lang.String.format;
+
+import com.google.common.annotations.VisibleForTesting;
+import jakarta.annotation.Nonnull;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import org.apache.polaris.ids.api.MonotonicClock;
+import org.apache.polaris.ids.api.SnowflakeIdGenerator;
+import org.apache.polaris.ids.spi.IdGeneratorSource;
+
+/**
+ * Implementation of a local, per-node generator for so-called "snowflake 
IDs", which are unique
+ * integer IDs in a distributed environment.
+ *
+ * <p>A monotonically increasing clock is <em>strictly required</em>. 
Invocations of {@link
+ * #generateId()} fail hard, if the clock walks backwards, which means it 
returns a lower value than
+ * before. It is recommended to use an implementation of {@link 
MonotonicClock} as the clock source.
+ *
+ * <p>The implementation is thread-safe.
+ *
+ * <p>Reference: <a
+ * 
href="https://medium.com/@jitenderkmr/demystifying-snowflake-ids-a-unique-identifier-in-distributed-computing-72796a827c9d";>Article
+ * on medium.com</a>, <a
+ * 
href="https://github.com/twitter-archive/snowflake/tree/b3f6a3c6ca8e1b6847baa6ff42bf72201e2c2231";>Twitter
+ * GitHub repository (archived)</a>
+ */
+class SnowflakeIdGeneratorImpl implements SnowflakeIdGenerator {
+
+  // TODO add a specialized implementation using hard-coded values for the 
standardized parameters
+
+  private static final AtomicLongFieldUpdater<SnowflakeIdGeneratorImpl> 
LAST_ID_UPDATER =
+      AtomicLongFieldUpdater.newUpdater(SnowflakeIdGeneratorImpl.class, 
"lastId");
+
+  private final IdGeneratorSource idGeneratorSource;
+
+  // Used in hot generateId()
+  private volatile long lastId;
+  private final long idEpoch;
+  private final long timestampMax;
+  private final int timestampShift;
+  private final int sequenceBits;
+  private final long sequenceMask;
+  private final long nodeMask;
+
+  SnowflakeIdGeneratorImpl(IdGeneratorSource idGeneratorSource) {
+    this(
+        DEFAULT_TIMESTAMP_BITS,
+        DEFAULT_SEQUENCE_BITS,
+        DEFAULT_NODE_ID_BITS,
+        ID_EPOCH_MILLIS,
+        idGeneratorSource);
+  }
+
+  SnowflakeIdGeneratorImpl(
+      int timestampBits,
+      int sequenceBits,
+      int nodeBits,
+      long idEpoch,
+      IdGeneratorSource idGeneratorSource) {
+    validateArguments(timestampBits, sequenceBits, nodeBits, idEpoch, 
idGeneratorSource);
+    this.timestampShift = sequenceBits + nodeBits;
+    this.timestampMax = 1L << timestampBits;
+    this.nodeMask = (1L << nodeBits) - 1;
+    this.sequenceBits = sequenceBits;
+    this.sequenceMask = (1L << sequenceBits) - 1;
+    this.idEpoch = idEpoch;
+    this.idGeneratorSource = idGeneratorSource;
+  }
+
+  static void validateArguments(
+      int timestampBits,
+      int sequenceBits,
+      int nodeBits,
+      long idEpochMillis,
+      IdGeneratorSource idGeneratorSource) {
+    var nowMillis = idGeneratorSource != null ? 
idGeneratorSource.currentTimeMillis() : -1;
+    var now = Instant.ofEpochMilli(nowMillis);
+    var timestampMax = 1L << timestampBits;
+    checkArgs(
+        () -> checkArgument(idGeneratorSource != null, "IdGeneratorSource must 
not be null"),
+        () ->
+            checkArgument(
+                nowMillis >= idEpochMillis,
+                "Clock returns a timestamp %s less than the configured epoch 
%s",
+                now,
+                Instant.ofEpochMilli(idEpochMillis)),
+        () ->
+            checkArgument(
+                nowMillis - idEpochMillis < timestampMax,
+                "Clock already returns a timestamp %s greater of after %s",
+                now,
+                Instant.ofEpochMilli(timestampMax)),
+        () ->
+            checkArgument(
+                nodeBits >= 2
+                    && sequenceBits >= 5
+                    && timestampBits >= 5 // this is REALLY low !
+                    && nodeBits < 64
+                    && sequenceBits < 64
+                    && timestampBits < 64,
+                "value of nodeBits %s or sequenceBits %s or timestampBits %s 
is too low or too high",
+                nodeBits,
+                sequenceBits,
+                timestampBits),
+        () ->
+            checkArgument(
+                timestampBits + nodeBits + sequenceBits == 63,
+                "Sum of timestampBits + nodeBits + sequenceBits must be == 
63"),
+        () -> {
+          if (idGeneratorSource != null) {
+            var nodeId = idGeneratorSource.nodeId();
+            var nodeMax = 1L << nodeBits;
+            checkArgument(
+                nodeId >= 0 && nodeId < nodeMax, "nodeId %s out of range 
[0..%s[", nodeId, nodeMax);
+          }
+        });
+  }
+
+  static void checkArgs(Runnable... checks) {
+    var violations = new ArrayList<String>();
+    for (Runnable check : checks) {
+      try {
+        check.run();
+      } catch (IllegalArgumentException iae) {
+        violations.add(iae.getMessage());
+      }
+    }
+    if (!violations.isEmpty()) {
+      throw new IllegalArgumentException(String.join(", ", violations));
+    }
+  }
+
+  @Override
+  public long systemIdForNode(int nodeId) {
+    return constructIdUnsafe(timestampMax - 1, 0, nodeId);
+  }
+
+  private long constructIdUnsafe(long timestamp, long sequence, long nodeId) {
+    return (timestamp << timestampShift) | (nodeId << sequenceBits) | sequence;
+  }
+
+  @Override
+  public long constructId(long timestamp, long sequence, long nodeId) {
+    checkArgument(
+        (timestamp & (timestampMax - 1)) != timestampMax - 1,
+        "timestamp argument %s out of range",
+        timestamp);
+    checkArgument(
+        (sequence & sequenceMask) == sequence, "sequence argument %s out of 
range", sequence);
+    checkArgument((nodeId & nodeMask) == nodeId, "nodeId argument %s out of 
range", nodeId);
+    return constructIdUnsafe(timestamp, sequence, nodeId);
+  }
+
+  @Override
+  public long generateId() {
+    var nodeId = idGeneratorSource.nodeId();
+    checkState(nodeId >= 0, "Cannot generate a new ID, shutting down?");
+    var nodeIdPattern = ((long) nodeId) << sequenceBits;
+
+    var needTimestamp = true;
+    var timestamp = 0L;
+
+    while (true) {
+      var last = LAST_ID_UPDATER.get(this);
+      var lastTimestamp = timestampFromId(last);
+
+      if (needTimestamp || timestamp < lastTimestamp) {
+        // MUST query the clock AFTER fetching 'lastId', otherwise a 
concurrent thread might update
+        // 'lastId' with a newer clock value and the monotonic-clock-source 
check would fail.
+        timestamp = idGeneratorSource.currentTimeMillis() - idEpoch;
+        checkState(
+            timestamp < timestampMax,
+            "Cannot generate any more IDs as the lifetime of the generator has 
expired");
+        if (timestamp < lastTimestamp) {
+          throw new IllegalStateException(
+              "Clock walked backwards from "
+                  + lastTimestamp
+                  + " to "
+                  + timestamp
+                  + ", provide a monotonically increasing clock source");
+        }
+        needTimestamp = false;
+      }
+
+      long sequence;
+      if (lastTimestamp == timestamp) {
+        sequence = sequenceFromId(last);
+        if (sequence == sequenceMask) {
+          // last generated sequence for the current millisecond yielded the 
maximum value,
+          // spin-wait until the next millisecond
+          spinWaitSequence();
+          // Force re-fetching the timestamp
+          needTimestamp = true;
+          continue;
+        }
+        sequence++;
+      } else {
+        sequence = 0L;
+      }
+
+      holdForTest();
+
+      var id = (timestamp << timestampShift) | nodeIdPattern | sequence;
+
+      if (LAST_ID_UPDATER.compareAndSet(this, last, id)) {
+        return id;
+      }
+
+      spinWaitRace();
+      // Do not re-fetch the timestamp from the clock source (a bit faster)
+    }
+  }
+
+  @VisibleForTesting
+  void holdForTest() {}
+
+  @VisibleForTesting
+  void spinWaitSequence() {
+    try {
+      // Sleep for 0.5ms - no Thread.yield() or Thread.onSpinWait(), because 
those cause too much
+      // CPU load
+      Thread.sleep(0, 500_000);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  @VisibleForTesting
+  void spinWaitRace() {
+    Thread.onSpinWait();
+  }
+
+  @Override
+  public long timestampFromId(long id) {
+    return id >>> timestampShift;
+  }
+
+  @Override
+  public long timestampUtcFromId(long id) {
+    return timestampFromId(id) + idEpoch;
+  }
+
+  @Override
+  public long sequenceFromId(long id) {
+    return id & sequenceMask;
+  }
+
+  @Override
+  public long nodeFromId(long id) {
+    return (id >>> sequenceBits) & nodeMask;
+  }
+
+  @Override
+  public UUID idToTimeUuid(long id) {
+    var timestamp = timestampFromId(id);
+    var sequence = sequenceFromId(id);
+    var node = nodeFromId(id);
+
+    return new UUID(timeUuidMsb(timestamp), timeUuidLsb(sequence, node));
+  }
+
+  @Override
+  public long timeUuidToId(@Nonnull UUID uuid) {
+    checkArgument(
+        uuid.variant() == 2 && uuid.version() == 1, "Must be a version 1 / 
variant 2 UUID");
+    var ts = uuid.timestamp() - idEpoch;
+    var seq = uuid.clockSequence();
+    var node = uuid.node();
+    checkArgument(
+        ts > 0
+            && ts <= timestampMax
+            && seq >= 0
+            && seq <= sequenceMask
+            && node >= 0
+            && node <= nodeMask,
+        "TimeUUID contains values that cannot be condensed into a 
snowflake-ID");
+    return constructId(ts, seq, node);
+  }
+
+  @Override
+  public String describeId(long id) {
+    var ts = timestampFromId(id);
+    var seq = sequenceFromId(id);
+    var node = nodeFromId(id);
+    var tsUnixEpoch = ts + idEpoch;
+    var instant = Instant.ofEpochMilli(tsUnixEpoch);
+    var zone = ZoneOffset.systemDefault();
+    var local = LocalDateTime.ofInstant(instant, zone);
+    return format(
+        """
+        Snowflake-ID %d components
+                         timestamp : %d
+                              node : %d%s
+                          sequence : %d
+                    timestamp/Unix : %d (= timestamp + epoch offset)
+                 timestamp/instant : %s
+                   timestamp/local : %s %s
+                  generator offset : %d / %s
+        """,
+        id,
+        ts,
+        node,
+        (ts == 0L && seq == 0L) ? " (system ID for this node)" : "",
+        seq,
+        tsUnixEpoch,
+        instant,
+        local,
+        zone,
+        idEpoch,
+        Instant.ofEpochMilli(idEpoch));
+  }
+
+  @Override
+  public int timestampBits() {
+    return Long.numberOfTrailingZeros(timestampMax);
+  }
+
+  @Override
+  public int sequenceBits() {
+    return sequenceBits;
+  }
+
+  @Override
+  public int nodeIdBits() {
+    return 64 - Long.numberOfLeadingZeros(nodeMask);
+  }
+
+  @Override
+  public String idToString(long id) {
+    var ts = timestampFromId(id);
+    return Instant.ofEpochMilli(ts + idEpoch).toString()
+        + " ("
+        + ts
+        + "), sequence "
+        + sequenceFromId(id)
+        + ", node "
+        + nodeFromId(id);
+  }
+
+  @VisibleForTesting
+  static long timeUuidLsb(long sequence, long node) {
+    // LSB:
+    //  0xC000000000000000 variant
+    //  0x3FFF000000000000 clock_seq
+    //  0x0000FFFFFFFFFFFF node
+
+    return
+    // variant
+    0x8000000000000000L
+        // clock_seq
+        | ((sequence << 48) & 0x3FFF000000000000L)
+        // node
+        | (node & 0x0000FFFFFFFFFFFFL);
+  }
+
+  @VisibleForTesting
+  private long timeUuidMsb(long timestamp) {
+    return timeUuidMsbReal(timestamp + idEpoch);
+  }
+
+  @VisibleForTesting
+  static long timeUuidMsbReal(long timestamp) {
+    // MSB:
+    //  0xFFFFFFFF00000000 time_low
+    //  0x00000000FFFF0000 time_mid
+    //  0x000000000000F000 version
+    //  0x0000000000000FFF time_hi
+
+    return
+    // time_low
+    (timestamp << 32 & 0xFFFFFFFF00000000L)
+        |
+        // time_mid
+        ((timestamp >>> (32 - 16) & 0x00000000FFFF0000L))
+        |
+        // version
+        0x0000000000001000L
+        |
+        // time_hi
+        ((timestamp >>> 48) & 0x0000000000000FFFL);
+  }
+}
diff --git a/persistence/nosql/idgen/impl/src/main/resources/META-INF/beans.xml 
b/persistence/nosql/idgen/impl/src/main/resources/META-INF/beans.xml
new file mode 100644
index 000000000..a297f1aa5
--- /dev/null
+++ b/persistence/nosql/idgen/impl/src/main/resources/META-INF/beans.xml
@@ -0,0 +1,24 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<beans xmlns="https://jakarta.ee/xml/ns/jakartaee";
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+  xsi:schemaLocation="https://jakarta.ee/xml/ns/jakartaee 
https://jakarta.ee/xml/ns/jakartaee/beans_4_0.xsd";>
+    <!-- File required by Weld (used for testing), not by Quarkus -->
+</beans>
\ No newline at end of file
diff --git 
a/persistence/nosql/idgen/impl/src/main/resources/META-INF/services/org.apache.polaris.ids.spi.IdGeneratorFactory
 
b/persistence/nosql/idgen/impl/src/main/resources/META-INF/services/org.apache.polaris.ids.spi.IdGeneratorFactory
new file mode 100644
index 000000000..7cfb95353
--- /dev/null
+++ 
b/persistence/nosql/idgen/impl/src/main/resources/META-INF/services/org.apache.polaris.ids.spi.IdGeneratorFactory
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+org.apache.polaris.ids.impl.SnowflakeIdGeneratorFactory
diff --git 
a/persistence/nosql/idgen/impl/src/test/java/org/apache/polaris/ids/impl/TestMonotonicClockImpl.java
 
b/persistence/nosql/idgen/impl/src/test/java/org/apache/polaris/ids/impl/TestMonotonicClockImpl.java
new file mode 100644
index 000000000..ef043d76e
--- /dev/null
+++ 
b/persistence/nosql/idgen/impl/src/test/java/org/apache/polaris/ids/impl/TestMonotonicClockImpl.java
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.ids.impl;
+
+import static java.util.concurrent.TimeUnit.MICROSECONDS;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import java.time.Instant;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.polaris.ids.api.MonotonicClock;
+import org.apache.polaris.ids.mocks.MutableMonotonicClock;
+import org.assertj.core.api.SoftAssertions;
+import org.assertj.core.api.junit.jupiter.InjectSoftAssertions;
+import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+@ExtendWith(SoftAssertionsExtension.class)
+@Timeout(value = 5, unit = MINUTES)
+public class TestMonotonicClockImpl {
+  @InjectSoftAssertions protected SoftAssertions soft;
+
+  @Test
+  public void simple() {
+    var systemNow = System.currentTimeMillis();
+
+    try (var monotonicClock =
+        new MutableMonotonicClock(systemNow, MILLISECONDS.toNanos(systemNow))) 
{
+      monotonicClock.start();
+      soft.assertThat(monotonicClock.currentTimeMillis()).isEqualTo(systemNow);
+    }
+  }
+
+  @Test
+  public void realClockMustNotGoBackwards() {
+    try (var monotonicClock = new MonotonicClockImpl()) {
+      monotonicClock.start();
+
+      var lastMicros = monotonicClock.currentTimeMicros();
+      var lastMillis = monotonicClock.currentTimeMillis();
+      var lastInstant = monotonicClock.currentInstant();
+      var lastNanos = monotonicClock.systemNanoTime();
+
+      // Run for 5 seconds so there is a real chance to catch a couple of 
"second wraps" and
+      // wall-clock changes.
+      var endAfter = lastNanos + SECONDS.toNanos(5);
+
+      while (true) {
+
+        var nanos = monotonicClock.systemNanoTime();
+        soft.assertThat(nanos).isGreaterThanOrEqualTo(lastNanos);
+
+        var micros = monotonicClock.currentTimeMicros();
+        soft.assertThat(micros).isGreaterThanOrEqualTo(lastMicros);
+
+        var millis = monotonicClock.currentTimeMillis();
+        soft.assertThat(millis).isGreaterThanOrEqualTo(lastMillis);
+
+        var instant = monotonicClock.currentInstant();
+        soft.assertThat(instant).isAfterOrEqualTo(lastInstant);
+
+        soft.assertAll();
+
+        lastMicros = micros;
+        lastMillis = millis;
+        lastInstant = instant;
+        lastNanos = nanos;
+
+        if (nanos > endAfter) {
+          break;
+        }
+      }
+    }
+  }
+
+  @ParameterizedTest
+  @ValueSource(
+      longs = {
+        0L,
+        500L,
+        1_000L,
+        1_000_000L,
+        1_000_000_000L,
+        // wrap around to negative
+        Long.MAX_VALUE,
+        // wrap around to negative after 50ms
+        Long.MAX_VALUE - 50_000_000L,
+        // wrap around to negative after 150ms
+        Long.MAX_VALUE,
+        // "negative for" 50ms
+        -50_000_000L,
+        // "negative for" 150ms
+        -150_000_000L,
+        // "negative for" 500ms
+        -500_000_000L,
+        // "negative for" 500s
+        -500_00_000_000L,
+        // always negative
+        Long.MIN_VALUE + 100_000_000_000L,
+        Long.MIN_VALUE + 100_000_000L,
+        Long.MIN_VALUE + 100_000L,
+        Long.MIN_VALUE + 1000L,
+        Long.MIN_VALUE + 500L,
+        Long.MIN_VALUE + 1,
+        Long.MIN_VALUE
+      })
+  public void nanoSourceNegativePositive(long nanoOffset) {
+    var nano = nanoOffset;
+    var systemWall = 0L;
+    var realTimeWall = 0L;
+    var inst = Instant.EPOCH;
+
+    // MonotonicClockImpl not started, no need to close()
+    @SuppressWarnings("resource")
+    var monotonicClock = new MutableMonotonicClock(systemWall, nano);
+
+    soft.assertThat(monotonicClock)
+        .extracting(
+            MonotonicClock::currentTimeMillis,
+            MonotonicClock::currentTimeMicros,
+            MonotonicClock::currentInstant,
+            MonotonicClock::nanoTime)
+        .containsExactly(realTimeWall, instantToMicros(inst), inst, nano);
+
+    monotonicClock.tick();
+
+    soft.assertThat(monotonicClock)
+        .extracting(
+            MonotonicClock::currentTimeMillis,
+            MonotonicClock::currentTimeMicros,
+            MonotonicClock::currentInstant,
+            MonotonicClock::nanoTime)
+        .containsExactly(realTimeWall, instantToMicros(inst), inst, nano);
+
+    monotonicClock.tick();
+
+    // -- wall clock too slow
+    // Wall clock advanced by 100ms
+    // Real time advanced by 200ms
+    realTimeWall = 200;
+    nano = nanoOffset + MILLISECONDS.toNanos(realTimeWall) + 123456L;
+    systemWall = 100;
+    inst = Instant.ofEpochSecond(0, MILLISECONDS.toNanos(realTimeWall) + 
123456L);
+    monotonicClock.setNanoTime(nano);
+    monotonicClock.setCurrentTimeMillis(systemWall);
+
+    monotonicClock.tick();
+
+    soft.assertThat(monotonicClock)
+        .extracting(
+            MonotonicClock::currentTimeMillis,
+            MonotonicClock::currentTimeMicros,
+            MonotonicClock::currentInstant,
+            MonotonicClock::nanoTime)
+        .containsExactly(realTimeWall, instantToMicros(inst), inst, nano);
+
+    realTimeWall = 1400;
+    nano = nanoOffset + MILLISECONDS.toNanos(realTimeWall) + 234567L;
+    systemWall = 1400;
+    inst = Instant.ofEpochSecond(0, MILLISECONDS.toNanos(1400) + 234567L);
+    monotonicClock.setNanoTime(nano);
+    monotonicClock.setCurrentTimeMillis(systemWall);
+
+    monotonicClock.tick();
+
+    soft.assertThat(monotonicClock)
+        .extracting(
+            MonotonicClock::currentTimeMillis,
+            MonotonicClock::currentTimeMicros,
+            MonotonicClock::currentInstant,
+            MonotonicClock::nanoTime)
+        .containsExactly(realTimeWall, instantToMicros(inst), inst, nano);
+
+    // wall clock goes backwards
+    // wall = 200;
+    monotonicClock.setNanoTime(nano);
+    systemWall = 1000;
+    monotonicClock.setCurrentTimeMillis(systemWall);
+
+    monotonicClock.tick();
+
+    soft.assertThat(monotonicClock)
+        .extracting(
+            MonotonicClock::currentTimeMillis,
+            MonotonicClock::currentTimeMicros,
+            MonotonicClock::currentInstant,
+            MonotonicClock::nanoTime)
+        .containsExactly(realTimeWall, instantToMicros(inst), inst, nano);
+
+    // wall clock advances
+    realTimeWall = 2000;
+    systemWall = 2000;
+    nano = nanoOffset + MILLISECONDS.toNanos(realTimeWall) + 234567L;
+    inst = Instant.ofEpochSecond(0, MILLISECONDS.toNanos(realTimeWall) + 
234567L);
+    monotonicClock.setNanoTime(nano);
+    monotonicClock.setCurrentTimeMillis(systemWall);
+
+    monotonicClock.tick();
+
+    soft.assertThat(monotonicClock)
+        .extracting(
+            MonotonicClock::currentTimeMillis,
+            MonotonicClock::currentTimeMicros,
+            MonotonicClock::currentInstant,
+            MonotonicClock::nanoTime)
+        .containsExactly(realTimeWall, instantToMicros(inst), inst, nano);
+  }
+
+  @Test
+  public void currentInstantAndMillis() {
+    // Note: this test case emits FAKE "MonotonicClock tick loop is stalled" 
warnings!
+    // Ignore those warnings.
+
+    // MonotonicClockImpl not started, no need to close()
+    @SuppressWarnings("resource")
+    var monotonicClock = new MutableMonotonicClock(0L, 0L);
+
+    monotonicClock.tick();
+
+    soft.assertThat(monotonicClock.systemNanoTime()).isEqualTo(0L);
+    soft.assertThat(monotonicClock.currentTimeMicros()).isEqualTo(0L);
+    soft.assertThat(monotonicClock.currentTimeMillis()).isEqualTo(0L);
+    
soft.assertThat(monotonicClock.currentInstant().toEpochMilli()).isEqualTo(0L);
+
+    var nanos = 456111222333L;
+    var millis = TimeUnit.NANOSECONDS.toMillis(nanos);
+    monotonicClock.setCurrentTimeMillis(millis);
+    monotonicClock.setNanoTime(nanos);
+
+    monotonicClock.tick();
+
+    soft.assertThat(monotonicClock.systemNanoTime()).isEqualTo(nanos);
+    soft.assertThat(monotonicClock.currentTimeMicros())
+        .extracting(MICROSECONDS::toMillis)
+        .isEqualTo(millis);
+    soft.assertThat(monotonicClock.currentTimeMillis()).isEqualTo(millis);
+    
soft.assertThat(monotonicClock.currentInstant().toEpochMilli()).isEqualTo(millis);
+  }
+
+  @Test
+  public void strictlyMonotonicIfWallClockGoesBackwards() {
+    var adjustCalled = new AtomicBoolean();
+
+    // MonotonicClockImpl not started, no need to close()
+    @SuppressWarnings("resource")
+    var monotonicClock =
+        new MutableMonotonicClock() {
+          @Override
+          protected void afterAdjust() {
+            adjustCalled.set(true);
+          }
+        };
+
+    var initial = monotonicClock.currentTimeMillis();
+
+    // Begin 
---------------------------------------------------------------------
+    //
+    // Check that the monotonic clock advances
+
+    adjustCalled.set(false);
+
+    // Increment the nano-clock source by 1 millisecond
+    monotonicClock.advanceNanos(1, MILLISECONDS);
+
+    monotonicClock.tick();
+
+    // Test case:
+    var afterWork1 = monotonicClock.currentTimeMillis();
+    soft.assertThat(adjustCalled).isFalse();
+    soft.assertThat(afterWork1).isGreaterThan(initial);
+
+    // <<End
+
+    // Begin 
---------------------------------------------------------------------
+    //
+    // Check that the monotonic clock fetches the updated wall clock, but 
disregards it as it
+    // went backwards
+
+    adjustCalled.set(false);
+
+    // Let the wall clock go backwards
+    monotonicClock.advanceCurrentTimeMillis(-10, MILLISECONDS);
+    // Increment the nano-clock source by 1 second
+    monotonicClock.advanceNanos(1, SECONDS);
+
+    monotonicClock.tick();
+
+    // Test case:
+    var afterWork2 = monotonicClock.currentTimeMillis();
+    soft.assertThat(adjustCalled).isFalse();
+    soft.assertThat(afterWork2).isGreaterThan(afterWork1);
+
+    // <<End
+
+    // Begin 
---------------------------------------------------------------------
+    //
+    // Check that the monotonic clock fetches the updated wall clock, but this 
time uses it, as
+    // it went forwards (after the last remembered wall clock)
+
+    adjustCalled.set(false);
+
+    // Let the wall clock go forwards to trigger the wall clock sync
+    monotonicClock.advanceCurrentTimeMillis(5, SECONDS);
+    // Increment the nano-clock source by 1 second
+    monotonicClock.advanceNanos(1, SECONDS);
+
+    monotonicClock.tick();
+
+    // Test case:
+    var afterWork3 = monotonicClock.currentTimeMillis();
+    soft.assertThat(adjustCalled).isTrue();
+    soft.assertThat(afterWork3).isGreaterThan(afterWork2);
+
+    // <<End
+  }
+
+  private static long instantToMicros(Instant inst) {
+    var microsFromSecond = SECONDS.toMicros(inst.getEpochSecond());
+    var microsFromNanoPart = NANOSECONDS.toMicros(inst.getNano());
+    return microsFromSecond + microsFromNanoPart;
+  }
+}
diff --git 
a/persistence/nosql/idgen/impl/src/test/java/org/apache/polaris/ids/impl/TestSnowflakeIdGeneratorImpl.java
 
b/persistence/nosql/idgen/impl/src/test/java/org/apache/polaris/ids/impl/TestSnowflakeIdGeneratorImpl.java
new file mode 100644
index 000000000..0b0503c62
--- /dev/null
+++ 
b/persistence/nosql/idgen/impl/src/test/java/org/apache/polaris/ids/impl/TestSnowflakeIdGeneratorImpl.java
@@ -0,0 +1,662 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.ids.impl;
+
+import static com.google.common.base.Preconditions.checkState;
+import static java.lang.String.format;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static 
org.apache.polaris.ids.api.SnowflakeIdGenerator.DEFAULT_NODE_ID_BITS;
+import static 
org.apache.polaris.ids.api.SnowflakeIdGenerator.DEFAULT_SEQUENCE_BITS;
+import static 
org.apache.polaris.ids.api.SnowflakeIdGenerator.DEFAULT_TIMESTAMP_BITS;
+import static org.apache.polaris.ids.api.SnowflakeIdGenerator.ID_EPOCH_MILLIS;
+import static org.apache.polaris.ids.impl.SnowflakeIdGeneratorImpl.timeUuidLsb;
+import static 
org.apache.polaris.ids.impl.SnowflakeIdGeneratorImpl.timeUuidMsbReal;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.LongSupplier;
+import org.apache.polaris.ids.api.MonotonicClock;
+import org.apache.polaris.ids.spi.IdGeneratorSource;
+import org.assertj.core.api.SoftAssertions;
+import org.assertj.core.api.junit.jupiter.InjectSoftAssertions;
+import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(SoftAssertionsExtension.class)
+@Timeout(value = 5, unit = MINUTES)
+public class TestSnowflakeIdGeneratorImpl {
+  public static final IdGeneratorSource ID_GENERATOR_SOURCE_CONSTANT =
+      new IdGeneratorSource() {
+        @Override
+        public long currentTimeMillis() {
+          return ID_EPOCH_MILLIS;
+        }
+
+        @Override
+        public int nodeId() {
+          return 0;
+        }
+      };
+  @InjectSoftAssertions protected SoftAssertions soft;
+
+  protected MonotonicClock clock;
+
+  @SuppressWarnings("resource")
+  @BeforeEach
+  protected void setUp() {
+    clock = MonotonicClockImpl.newDefaultInstance();
+  }
+
+  @AfterEach
+  protected void tearDown() {
+    clock.close();
+  }
+
+  @Test
+  public void validArgs() {
+    soft.assertThatCode(
+            () ->
+                new SnowflakeIdGeneratorFactory()
+                    .buildIdGenerator(Map.of(), idGeneratorSource(0, 
clock::currentTimeMillis)))
+        .doesNotThrowAnyException();
+    soft.assertThatCode(
+            () ->
+                new SnowflakeIdGeneratorFactory()
+                    .buildIdGenerator(
+                        Map.of(),
+                        idGeneratorSource(
+                            (1 << DEFAULT_NODE_ID_BITS) - 1, 
clock::currentTimeMillis)))
+        .doesNotThrowAnyException();
+  }
+
+  @Test
+  public void invalidArgs() {
+    var validClock = (LongSupplier) () -> ID_EPOCH_MILLIS;
+
+    soft.assertThatIllegalArgumentException()
+        .isThrownBy(
+            () ->
+                new SnowflakeIdGeneratorFactory()
+                    .buildIdGenerator(Map.of(), idGeneratorSource(-1, () -> 
0L)))
+        .withMessage(
+            "Clock returns a timestamp 1970-01-01T00:00:00Z less than the 
configured epoch 2025-03-01T00:00:00Z, nodeId -1 out of range [0..1024[");
+    soft.assertThatIllegalArgumentException()
+        .isThrownBy(
+            () ->
+                new SnowflakeIdGeneratorFactory()
+                    .buildIdGenerator(Map.of(), idGeneratorSource(-1, 
validClock)))
+        .withMessage("nodeId -1 out of range [0..1024[");
+    soft.assertThatIllegalArgumentException()
+        .isThrownBy(
+            () ->
+                new SnowflakeIdGeneratorFactory()
+                    .buildIdGenerator(
+                        Map.of(), idGeneratorSource(1 << DEFAULT_NODE_ID_BITS, 
validClock)))
+        .withMessage("nodeId 1024 out of range [0..1024[");
+
+    soft.assertThatIllegalArgumentException()
+        .isThrownBy(
+            () ->
+                new SnowflakeIdGeneratorFactory()
+                    .buildIdGenerator(
+                        31,
+                        DEFAULT_SEQUENCE_BITS,
+                        DEFAULT_NODE_ID_BITS,
+                        ID_EPOCH_MILLIS,
+                        ID_GENERATOR_SOURCE_CONSTANT))
+        .withMessage("Sum of timestampBits + nodeBits + sequenceBits must be 
== 63");
+    soft.assertThatIllegalArgumentException()
+        .isThrownBy(
+            () ->
+                new SnowflakeIdGeneratorFactory()
+                    .buildIdGenerator(
+                        DEFAULT_TIMESTAMP_BITS,
+                        4,
+                        DEFAULT_NODE_ID_BITS,
+                        ID_EPOCH_MILLIS,
+                        ID_GENERATOR_SOURCE_CONSTANT))
+        .withMessage(
+            "value of nodeBits 10 or sequenceBits 4 or timestampBits 41 is too 
low or too high, Sum of timestampBits + nodeBits + sequenceBits must be == 63");
+    soft.assertThatIllegalArgumentException()
+        .isThrownBy(
+            () ->
+                new SnowflakeIdGeneratorFactory()
+                    .buildIdGenerator(
+                        DEFAULT_TIMESTAMP_BITS,
+                        DEFAULT_SEQUENCE_BITS,
+                        4,
+                        ID_EPOCH_MILLIS,
+                        ID_GENERATOR_SOURCE_CONSTANT))
+        .withMessage("Sum of timestampBits + nodeBits + sequenceBits must be 
== 63");
+
+    soft.assertThatIllegalArgumentException()
+        .isThrownBy(
+            () ->
+                new SnowflakeIdGeneratorFactory()
+                    .buildIdGenerator(
+                        64,
+                        DEFAULT_SEQUENCE_BITS,
+                        DEFAULT_NODE_ID_BITS,
+                        ID_EPOCH_MILLIS,
+                        ID_GENERATOR_SOURCE_CONSTANT))
+        .withMessage(
+            "value of nodeBits 10 or sequenceBits 12 or timestampBits 64 is 
too low or too high, Sum of timestampBits + nodeBits + sequenceBits must be == 
63");
+    soft.assertThatIllegalArgumentException()
+        .isThrownBy(
+            () ->
+                new SnowflakeIdGeneratorFactory()
+                    .buildIdGenerator(
+                        DEFAULT_TIMESTAMP_BITS,
+                        64,
+                        DEFAULT_NODE_ID_BITS,
+                        ID_EPOCH_MILLIS,
+                        ID_GENERATOR_SOURCE_CONSTANT))
+        .withMessage(
+            "value of nodeBits 10 or sequenceBits 64 or timestampBits 41 is 
too low or too high, Sum of timestampBits + nodeBits + sequenceBits must be == 
63");
+    soft.assertThatIllegalArgumentException()
+        .isThrownBy(
+            () ->
+                new SnowflakeIdGeneratorFactory()
+                    .buildIdGenerator(
+                        DEFAULT_TIMESTAMP_BITS,
+                        DEFAULT_SEQUENCE_BITS,
+                        64,
+                        ID_EPOCH_MILLIS,
+                        ID_GENERATOR_SOURCE_CONSTANT))
+        .withMessage(
+            "value of nodeBits 64 or sequenceBits 12 or timestampBits 41 is 
too low or too high, Sum of timestampBits + nodeBits + sequenceBits must be == 
63");
+
+    soft.assertThatIllegalArgumentException()
+        .isThrownBy(
+            () ->
+                new SnowflakeIdGeneratorFactory()
+                    .buildIdGenerator(
+                        DEFAULT_TIMESTAMP_BITS,
+                        5,
+                        DEFAULT_NODE_ID_BITS,
+                        ID_EPOCH_MILLIS,
+                        ID_GENERATOR_SOURCE_CONSTANT))
+        .withMessage("Sum of timestampBits + nodeBits + sequenceBits must be 
== 63");
+    soft.assertThatIllegalArgumentException()
+        .isThrownBy(
+            () ->
+                new SnowflakeIdGeneratorFactory()
+                    .buildIdGenerator(
+                        DEFAULT_TIMESTAMP_BITS,
+                        4,
+                        18,
+                        ID_EPOCH_MILLIS,
+                        ID_GENERATOR_SOURCE_CONSTANT))
+        .withMessage(
+            "value of nodeBits 18 or sequenceBits 4 or timestampBits 41 is too 
low or too high");
+  }
+
+  @Test
+  public void clockBackwards() {
+    var clock = new AtomicLong(ID_EPOCH_MILLIS + 100);
+
+    var nodeId = 42;
+
+    var ids = new HashSet<Long>();
+
+    var impl =
+        new SnowflakeIdGeneratorFactory()
+            .buildIdGenerator(Map.of(), idGeneratorSource(nodeId, clock::get));
+
+    soft.assertThatCode(() -> 
soft.assertThat(ids.add(impl.generateId())).isTrue())
+        .doesNotThrowAnyException();
+    soft.assertThatCode(() -> 
soft.assertThat(ids.add(impl.generateId())).isTrue())
+        .doesNotThrowAnyException();
+    clock.addAndGet(20);
+    soft.assertThatCode(() -> 
soft.assertThat(ids.add(impl.generateId())).isTrue())
+        .doesNotThrowAnyException();
+    soft.assertThatCode(() -> 
soft.assertThat(ids.add(impl.generateId())).isTrue())
+        .doesNotThrowAnyException();
+    clock.addAndGet(-1);
+    soft.assertThatIllegalStateException()
+        .isThrownBy(impl::generateId)
+        .withMessage(
+            "Clock walked backwards from 120 to 119, provide a monotonically 
increasing clock source");
+    clock.addAndGet(1);
+    soft.assertThatCode(() -> 
soft.assertThat(ids.add(impl.generateId())).isTrue())
+        .doesNotThrowAnyException();
+  }
+
+  /**
+   * Tests concurrency in {@link SnowflakeIdGeneratorImpl}, forcing CAS update 
races on the {@code
+   * lastId} field and asserting that {@link 
SnowflakeIdGeneratorImpl#spinWaitRace()} has been
+   * called.
+   */
+  @Test
+  @Timeout(value = 5, unit = MINUTES)
+  public void concurrency() throws Exception {
+    var clock = new AtomicLong(ID_EPOCH_MILLIS + 100);
+
+    var nodeId = 42;
+
+    var holdEnterT1 = new Semaphore(0);
+    var holdEnterT2 = new Semaphore(0);
+    var holdWaitT1 = new Semaphore(0);
+    var holdWaitT2 = new Semaphore(0);
+    var spinEnterT2 = new Semaphore(0);
+    var spinWaitT2 = new Semaphore(0);
+
+    var syncGenerate = new CountDownLatch(1);
+    var syncMain = new CountDownLatch(2);
+    var thread1 = new AtomicReference<Thread>();
+    var impl =
+        new SnowflakeIdGeneratorImpl(idGeneratorSource(nodeId, clock::get)) {
+          @Override
+          void holdForTest() {
+            var isT1 = thread1.get() == Thread.currentThread();
+
+            (isT1 ? holdEnterT1 : holdEnterT2).release();
+            (isT1 ? holdWaitT1 : holdWaitT2).acquireUninterruptibly();
+          }
+
+          @Override
+          void spinWaitRace() {
+            var isT1 = thread1.get() == Thread.currentThread();
+
+            checkState(!isT1);
+
+            spinEnterT2.release();
+            spinWaitT2.acquireUninterruptibly();
+          }
+        };
+
+    try (var executorService = Executors.newFixedThreadPool(2)) {
+
+      var t1 =
+          executorService.submit(
+              () -> {
+                thread1.set(Thread.currentThread());
+
+                syncMain.countDown();
+                syncGenerate.await();
+
+                return impl.generateId();
+              });
+
+      var t2 =
+          executorService.submit(
+              () -> {
+                syncMain.countDown();
+                syncGenerate.await();
+
+                return impl.generateId();
+              });
+
+      // Wait for both threads to reach the same state
+      syncMain.await();
+      // Let both threads continue
+      syncGenerate.countDown();
+
+      // Wait until both threads reached the "holdForTest()" function after 
loading the 'lastId'
+      // field with the same value
+      holdEnterT1.acquireUninterruptibly();
+      holdEnterT2.acquireUninterruptibly();
+
+      // Let T1 continue (and finish) - updates 'lastId' field
+      holdWaitT1.release();
+      // Get T1's result
+      var id1 = t1.get();
+
+      // Let T2 continue - it will race updating the 'lastId' field
+      holdWaitT2.release();
+
+      // Wait until T2 enters "spinWait()"
+      spinEnterT2.acquireUninterruptibly();
+      // Let T2 retry
+      spinWaitT2.release();
+
+      // Wait until T2 reached "holdForTest()" after hitting the 'lastId' 
update race
+      holdEnterT2.acquireUninterruptibly();
+      // Let T2 continue - next iteration should not race
+      holdWaitT2.release();
+
+      var id2 = t2.get();
+
+      soft.assertThat(id1).isNotEqualTo(id2);
+    }
+  }
+
+  @Test
+  public void manyThreads() throws Exception {
+    var threads = Runtime.getRuntime().availableProcessors() * 2;
+    var nodeId = 42;
+    var impl =
+        new SnowflakeIdGeneratorFactory()
+            .buildIdGenerator(Map.of(), idGeneratorSource(nodeId, 
clock::currentTimeMillis));
+
+    var sync = new CountDownLatch(threads);
+    var start = new CountDownLatch(1);
+    var done = new CountDownLatch(threads);
+    var finish = new CountDownLatch(1);
+    var numIdsPerThread = 5000;
+
+    try (var executorService = Executors.newFixedThreadPool(threads)) {
+      var ids = ConcurrentHashMap.newKeySet(numIdsPerThread * threads * 2);
+      var futures = new ArrayList<Future<?>>(threads);
+
+      for (int i = 0; i < threads; i++) {
+        futures.add(
+            executorService.submit(
+                () -> {
+                  sync.countDown();
+                  start.await();
+
+                  var localIds = new HashSet<Long>(numIdsPerThread * 2);
+                  try {
+                    for (int n = 0; n < numIdsPerThread; n++) {
+                      localIds.add(impl.generateId());
+                    }
+                  } finally {
+                    done.countDown();
+                    finish.await();
+                  }
+
+                  ids.addAll(localIds);
+
+                  return null;
+                }));
+      }
+
+      // Wait until all threads have started
+      sync.await();
+      // Let threads start
+      start.countDown();
+
+      // Wait until all threads have started
+      done.await();
+      // Let threads start
+      finish.countDown();
+
+      for (Future<?> future : futures) {
+        future.get();
+      }
+
+      soft.assertThat(ids).hasSize(numIdsPerThread * threads);
+    }
+  }
+
+  @Test
+  public void maxIdsPerMillisecondAtEpochOffset() {
+    var clock = (LongSupplier) () -> ID_EPOCH_MILLIS;
+
+    var nodeId = 42;
+
+    var impl =
+        new SnowflakeIdGeneratorImpl(idGeneratorSource(nodeId, clock)) {
+          @Override
+          void spinWaitSequence() {
+            throw new RuntimeException("Moo");
+          }
+
+          @Override
+          void spinWaitRace() {
+            throw new RuntimeException("Moo");
+          }
+        };
+
+    // Implementation detail: the 1st sequence if the timestamp is equal to 
EPOCH_OFFSET is 1.
+    // All other initial timestamps start with sequence == 0.
+    var maxPerMillisecond = 4095;
+
+    var ids = new long[maxPerMillisecond];
+    for (var i = 0; i < maxPerMillisecond; i++) {
+      ids[i] = impl.generateId();
+    }
+
+    soft.assertThatThrownBy(impl::generateId)
+        .isInstanceOf(RuntimeException.class)
+        .hasMessage("Moo");
+    soft.assertThatThrownBy(impl::generateId)
+        .isInstanceOf(RuntimeException.class)
+        .hasMessage("Moo");
+
+    var expect = ((long) nodeId) << DEFAULT_SEQUENCE_BITS | 1L;
+    for (var i = 0; i < ids.length; i++) {
+      var id = ids[i];
+      soft.assertThat(id).describedAs("index %d", i).isEqualTo(expect);
+      soft.assertThat(impl.nodeFromId(id)).isEqualTo(nodeId);
+      soft.assertThat(impl.timestampFromId(id)).isEqualTo(0L);
+      soft.assertThat(impl.sequenceFromId(id)).isEqualTo(i + 1);
+      soft.assertThat(impl.idToString(id))
+          .isEqualTo(
+              format(
+                  "%s (%d), sequence %d, node %d",
+                  Instant.ofEpochMilli(ID_EPOCH_MILLIS), 0, i + 1, nodeId));
+      expect++;
+      if (i % 10 == 0) {
+        soft.assertAll();
+      }
+    }
+  }
+
+  @Test
+  public void maxIdsPerMillisecondAtNowWithMutableClock() {
+    var clockSource = new AtomicLong(ID_EPOCH_MILLIS + 
TimeUnit.DAYS.toMillis(365));
+    var clock = (LongSupplier) clockSource::get;
+    var initialTimestamp = clock.getAsLong() - ID_EPOCH_MILLIS;
+
+    var nodeId = 42;
+
+    var impl =
+        new SnowflakeIdGeneratorImpl(idGeneratorSource(nodeId, clock)) {
+          @Override
+          void spinWaitSequence() {
+            clockSource.incrementAndGet();
+          }
+        };
+
+    for (var millis = 0; millis < 5; millis++) {
+      var expect =
+          ((initialTimestamp + millis) << (DEFAULT_SEQUENCE_BITS + 
DEFAULT_NODE_ID_BITS))
+              |
+              //
+              ((long) nodeId) << DEFAULT_SEQUENCE_BITS;
+
+      for (var j = 0; j < 4096; j++) {
+        var id = impl.generateId();
+        var uuid = impl.idToTimeUuid(id);
+
+        
soft.assertThat(impl.nodeFromId(id)).isEqualTo(nodeId).isEqualTo(uuid.node());
+        soft.assertThat(impl.timestampFromId(id))
+            .isEqualTo(initialTimestamp + millis)
+            .isEqualTo(uuid.timestamp() - ID_EPOCH_MILLIS);
+        soft.assertThat(uuid).extracting(UUID::variant, 
UUID::version).containsExactly(2, 1);
+        soft.assertThat(impl.timeUuidToId(uuid)).isEqualTo(id);
+        
soft.assertThat(impl.timestampUtcFromId(id)).isEqualTo(uuid.timestamp());
+        
soft.assertThat(impl.sequenceFromId(id)).isEqualTo(j).isEqualTo(uuid.clockSequence());
+        soft.assertThat(impl.idToString(id))
+            .isEqualTo(
+                format(
+                    "%s (%d), sequence %d, node %d",
+                    Instant.ofEpochMilli(ID_EPOCH_MILLIS + initialTimestamp + 
millis),
+                    initialTimestamp + millis,
+                    j,
+                    nodeId));
+        soft.assertThat(id).describedAs("millis %d - seq %d", millis, 
j).isEqualTo(expect);
+        expect++;
+        if (j % 10 == 0) {
+          soft.assertAll();
+        }
+      }
+    }
+  }
+
+  @Test
+  public void miscUuid() {
+    var clockSource = new AtomicLong(ID_EPOCH_MILLIS + 
TimeUnit.DAYS.toMillis(365));
+    var clock = (LongSupplier) clockSource::get;
+
+    var nodeId = 42;
+
+    var impl =
+        new SnowflakeIdGeneratorImpl(idGeneratorSource(nodeId, clock)) {
+          @Override
+          void spinWaitSequence() {
+            clockSource.incrementAndGet();
+          }
+        };
+
+    soft.assertThatIllegalArgumentException()
+        .isThrownBy(() -> impl.timeUuidToId(UUID.randomUUID()))
+        .withMessage("Must be a version 1 / variant 2 UUID");
+    soft.assertThatIllegalArgumentException()
+        .isThrownBy(() -> 
impl.timeUuidToId(UUID.nameUUIDFromBytes("foobar".getBytes(UTF_8))))
+        .withMessage("Must be a version 1 / variant 2 UUID");
+
+    long tsUuidHighest = 0xFFFFFFFFFFFFFFFL;
+    long seqUuidHighest = 0xFFFFFFFFFFFFL;
+    long nodeUuidHighest = 0x3FFF;
+
+    soft.assertThatIllegalArgumentException()
+        .isThrownBy(
+            () ->
+                impl.timeUuidToId(
+                    new UUID(
+                        timeUuidMsbReal(tsUuidHighest),
+                        timeUuidLsb(seqUuidHighest, nodeUuidHighest))))
+        .withMessage("TimeUUID contains values that cannot be condensed into a 
snowflake-ID");
+
+    soft.assertThatCode(
+            () ->
+                impl.timeUuidToId(
+                    new UUID(
+                        timeUuidMsbReal((1L << DEFAULT_TIMESTAMP_BITS) - 1),
+                        timeUuidLsb(
+                            (1L << DEFAULT_SEQUENCE_BITS) - 1, (1L << 
DEFAULT_NODE_ID_BITS) - 1))))
+        .doesNotThrowAnyException();
+  }
+
+  @Test
+  public void maxIdsPerMillisecondAtNowWithRealClock() {
+    var nodeId = 42;
+
+    var initialTimestamp = clock.currentTimeMillis() - ID_EPOCH_MILLIS;
+    var impl =
+        new SnowflakeIdGeneratorFactory()
+            .buildIdGenerator(Map.of(), idGeneratorSource(nodeId, 
clock::currentTimeMillis));
+
+    var ids = new HashSet<Long>();
+    for (var i = 0; i < 10 * 4096; i++) {
+
+      var id = impl.generateId();
+      soft.assertThat(ids.add(id)).isTrue();
+
+      var uuid = impl.idToTimeUuid(id);
+
+      
soft.assertThat(impl.nodeFromId(id)).isEqualTo(nodeId).isEqualTo(uuid.node());
+      soft.assertThatCode(
+              () -> {
+                assertThat(uuid.node()).isEqualTo(nodeId);
+                assertThat(uuid.timestamp())
+                    .isGreaterThan(ID_EPOCH_MILLIS)
+                    .isEqualTo(impl.timestampUtcFromId(id));
+                
assertThat(uuid.clockSequence()).isGreaterThanOrEqualTo(0).isLessThan(4096);
+                assertThat(uuid.variant()).isEqualTo(2);
+                assertThat(uuid.version()).isEqualTo(1);
+              })
+          .doesNotThrowAnyException();
+      soft.assertThat(impl.timeUuidToId(uuid)).isEqualTo(id);
+      
soft.assertThat(impl.timestampFromId(id)).isGreaterThanOrEqualTo(initialTimestamp);
+      soft.assertThat(impl.sequenceFromId(id))
+          .isGreaterThanOrEqualTo(0)
+          .isLessThan(4096)
+          .isEqualTo(uuid.clockSequence());
+      soft.assertThat(impl.idToString(id))
+          .isEqualTo(
+              format(
+                  "%s (%d), sequence %d, node %d",
+                  Instant.ofEpochMilli(impl.timestampFromId(id) + 
ID_EPOCH_MILLIS),
+                  impl.timestampFromId(id),
+                  impl.sequenceFromId(id),
+                  nodeId));
+
+      if (i % 10 == 0) {
+        soft.assertAll();
+      }
+    }
+  }
+
+  @Test
+  public void validationCallback() {
+    var nodeId = 42;
+
+    var validationValue = new AtomicBoolean(true);
+
+    var impl =
+        new SnowflakeIdGeneratorFactory()
+            .buildIdGenerator(
+                Map.of(),
+                new IdGeneratorSource() {
+                  @Override
+                  public int nodeId() {
+                    return validationValue.get() ? nodeId : -1;
+                  }
+
+                  @Override
+                  public long currentTimeMillis() {
+                    return clock.currentTimeMillis();
+                  }
+                });
+
+    soft.assertThatCode(impl::generateId).doesNotThrowAnyException();
+
+    validationValue.set(false);
+
+    soft.assertThatIllegalStateException()
+        .isThrownBy(impl::generateId)
+        .withMessage("Cannot generate a new ID, shutting down?");
+  }
+
+  static IdGeneratorSource idGeneratorSource(int nodeId, LongSupplier clock) {
+    return new IdGeneratorSource() {
+      @Override
+      public int nodeId() {
+        return nodeId;
+      }
+
+      @Override
+      public long currentTimeMillis() {
+        return clock.getAsLong();
+      }
+    };
+  }
+}
diff --git a/persistence/nosql/idgen/impl/src/test/resources/logback-test.xml 
b/persistence/nosql/idgen/impl/src/test/resources/logback-test.xml
new file mode 100644
index 000000000..4a4d9a629
--- /dev/null
+++ b/persistence/nosql/idgen/impl/src/test/resources/logback-test.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<configuration debug="false">
+  <contextListener class="ch.qos.logback.classic.jul.LevelChangePropagator"/>
+  <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
+    <encoder>
+      <pattern>%date{ISO8601} [%thread] %-5level %logger{36} - %msg%n</pattern>
+    </encoder>
+  </appender>
+  <root level="${test.log.level:-INFO}">
+    <appender-ref ref="console"/>
+  </root>
+</configuration>
diff --git a/persistence/nosql/idgen/mocks/build.gradle.kts 
b/persistence/nosql/idgen/mocks/build.gradle.kts
new file mode 100644
index 000000000..fc886dc83
--- /dev/null
+++ b/persistence/nosql/idgen/mocks/build.gradle.kts
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+plugins {
+  id("org.kordamp.gradle.jandex")
+  id("polaris-server")
+}
+
+description = "Polaris ID generation mocks for testing"
+
+dependencies {
+  api(project(":polaris-idgen-api"))
+  api(project(":polaris-idgen-spi"))
+  api(project(":polaris-idgen-impl"))
+
+  implementation(libs.guava)
+  implementation(libs.slf4j.api)
+
+  compileOnly(libs.jakarta.annotation.api)
+  compileOnly(libs.jakarta.validation.api)
+  compileOnly(libs.jakarta.inject.api)
+  compileOnly(libs.jakarta.enterprise.cdi.api)
+
+  implementation(platform(libs.jackson.bom))
+  implementation("com.fasterxml.jackson.core:jackson-databind")
+}
diff --git 
a/persistence/nosql/idgen/mocks/src/main/java/org/apache/polaris/ids/mocks/MutableMonotonicClock.java
 
b/persistence/nosql/idgen/mocks/src/main/java/org/apache/polaris/ids/mocks/MutableMonotonicClock.java
new file mode 100644
index 000000000..89433733d
--- /dev/null
+++ 
b/persistence/nosql/idgen/mocks/src/main/java/org/apache/polaris/ids/mocks/MutableMonotonicClock.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.ids.mocks;
+
+import com.google.errorprone.annotations.CanIgnoreReturnValue;
+import jakarta.enterprise.inject.Specializes;
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.polaris.ids.impl.MonotonicClockImpl;
+
+@Specializes
+public class MutableMonotonicClock extends MonotonicClockImpl {
+  private final AtomicLong nanoTime = new AtomicLong();
+  private final AtomicLong currentTimeMillis = new AtomicLong();
+
+  public MutableMonotonicClock() {
+    this(System.currentTimeMillis(), System.nanoTime());
+  }
+
+  public MutableMonotonicClock(long currentTimeMillis, long nanoTime) {
+    super(false);
+    this.currentTimeMillis.set(currentTimeMillis);
+    this.nanoTime.set(nanoTime);
+    setup();
+  }
+
+  @CanIgnoreReturnValue
+  public MutableMonotonicClock setCurrentTimeMillis(long currentTimeMillis) {
+    this.currentTimeMillis.set(currentTimeMillis);
+    return this;
+  }
+
+  @CanIgnoreReturnValue
+  public MutableMonotonicClock setNanoTime(long nanoTime) {
+    this.nanoTime.set(nanoTime);
+    return this;
+  }
+
+  @CanIgnoreReturnValue
+  public MonotonicClockImpl advanceBoth(long time, TimeUnit unit) {
+    nanoTime.addAndGet(unit.toNanos(time));
+    currentTimeMillis.addAndGet(unit.toMillis(time));
+    return this;
+  }
+
+  @CanIgnoreReturnValue
+  public MonotonicClockImpl advanceBoth(Duration duration) {
+    nanoTime.addAndGet(duration.toNanos());
+    currentTimeMillis.addAndGet(duration.toMillis());
+    return this;
+  }
+
+  @CanIgnoreReturnValue
+  public MonotonicClockImpl advanceNanos(long time, TimeUnit unit) {
+    nanoTime.addAndGet(unit.toNanos(time));
+    return this;
+  }
+
+  @CanIgnoreReturnValue
+  public MonotonicClockImpl advanceNanos(Duration duration) {
+    nanoTime.addAndGet(duration.toNanos());
+    return this;
+  }
+
+  @CanIgnoreReturnValue
+  public MonotonicClockImpl advanceCurrentTimeMillis(long time, TimeUnit unit) 
{
+    currentTimeMillis.addAndGet(unit.toMillis(time));
+    return this;
+  }
+
+  @CanIgnoreReturnValue
+  public MonotonicClockImpl advanceCurrentTimeMillis(Duration duration) {
+    currentTimeMillis.addAndGet(duration.toMillis());
+    return this;
+  }
+
+  @Override
+  public long systemCurrentTimeMillis() {
+    return currentTimeMillis.get();
+  }
+
+  @Override
+  public long systemNanoTime() {
+    return nanoTime.get();
+  }
+}
diff --git 
a/persistence/nosql/idgen/mocks/src/main/resources/META-INF/beans.xml 
b/persistence/nosql/idgen/mocks/src/main/resources/META-INF/beans.xml
new file mode 100644
index 000000000..a297f1aa5
--- /dev/null
+++ b/persistence/nosql/idgen/mocks/src/main/resources/META-INF/beans.xml
@@ -0,0 +1,24 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<beans xmlns="https://jakarta.ee/xml/ns/jakartaee";
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+  xsi:schemaLocation="https://jakarta.ee/xml/ns/jakartaee 
https://jakarta.ee/xml/ns/jakartaee/beans_4_0.xsd";>
+    <!-- File required by Weld (used for testing), not by Quarkus -->
+</beans>
\ No newline at end of file
diff --git a/persistence/nosql/idgen/spi/build.gradle.kts 
b/persistence/nosql/idgen/spi/build.gradle.kts
new file mode 100644
index 000000000..94ac6d352
--- /dev/null
+++ b/persistence/nosql/idgen/spi/build.gradle.kts
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+plugins {
+  id("org.kordamp.gradle.jandex")
+  id("polaris-server")
+}
+
+description = "Polaris ID generation SPI"
+
+dependencies {
+  implementation(project(":polaris-idgen-api"))
+
+  compileOnly(libs.jakarta.annotation.api)
+  compileOnly(libs.jakarta.validation.api)
+  compileOnly(libs.jakarta.inject.api)
+  compileOnly(libs.jakarta.enterprise.cdi.api)
+
+  compileOnly(libs.smallrye.config.core)
+  compileOnly(platform(libs.quarkus.bom))
+  compileOnly("io.quarkus:quarkus-core")
+
+  annotationProcessor(project(":polaris-immutables", configuration = 
"processor"))
+
+  implementation(platform(libs.jackson.bom))
+  implementation("com.fasterxml.jackson.core:jackson-databind")
+}
diff --git 
a/persistence/nosql/idgen/spi/src/main/java/org/apache/polaris/ids/spi/IdGeneratorFactory.java
 
b/persistence/nosql/idgen/spi/src/main/java/org/apache/polaris/ids/spi/IdGeneratorFactory.java
new file mode 100644
index 000000000..767d9fd53
--- /dev/null
+++ 
b/persistence/nosql/idgen/spi/src/main/java/org/apache/polaris/ids/spi/IdGeneratorFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.ids.spi;
+
+import java.util.Map;
+import java.util.ServiceLoader;
+import org.apache.polaris.ids.api.IdGenerator;
+
+/** Provides values for ID generators, usually provided by {@code NodeLease} 
implementations. */
+public interface IdGeneratorFactory<I extends IdGenerator> {
+  String name();
+
+  void validateParameters(Map<String, String> params, IdGeneratorSource 
idGeneratorSource);
+
+  I buildIdGenerator(Map<String, String> params, IdGeneratorSource 
idGeneratorSource);
+
+  I buildSystemIdGenerator(Map<String, String> params);
+
+  static IdGeneratorFactory<?> lookupFactory(String name) {
+    for (IdGeneratorFactory<?> factory : 
ServiceLoader.load(IdGeneratorFactory.class)) {
+      if (factory.name().equals(name)) {
+        return factory;
+      }
+    }
+    throw new IllegalArgumentException("No IdGeneratorFactory found for name " 
+ name);
+  }
+}
diff --git 
a/persistence/nosql/idgen/spi/src/main/java/org/apache/polaris/ids/spi/IdGeneratorSource.java
 
b/persistence/nosql/idgen/spi/src/main/java/org/apache/polaris/ids/spi/IdGeneratorSource.java
new file mode 100644
index 000000000..a84b19f1b
--- /dev/null
+++ 
b/persistence/nosql/idgen/spi/src/main/java/org/apache/polaris/ids/spi/IdGeneratorSource.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.ids.spi;
+
+/** Provides values for ID generators, usually provided by {@code NodeLease} 
implementations. */
+public interface IdGeneratorSource {
+  /** Returns the node ID if the node is active/valid or {@code -1}. */
+  int nodeId();
+
+  long currentTimeMillis();
+}

Reply via email to