This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-trino.git
The following commit(s) were added to refs/heads/main by this push:
new 5144ad9 Update Paimon core to 1.0-SNAPSHOT
5144ad9 is described below
commit 5144ad9087e38a3165ae3b160560f1edd2b29156
Author: Jingsong <[email protected]>
AuthorDate: Tue Nov 26 15:18:25 2024 +0800
Update Paimon core to 1.0-SNAPSHOT
fix
---
.../org/apache/paimon/trino/TrinoMetadata.java | 35 +++++++------
.../trino/TrinoNodePartitioningProvider.java | 10 +---
.../apache/paimon/trino/TrinoPageSinkProvider.java | 13 ++---
.../paimon/trino/TrinoPageSourceProvider.java | 2 +-
.../paimon/trino/TrinoPartitioningHandle.java | 16 +-----
.../org/apache/paimon/trino/TrinoSplitManager.java | 2 +-
.../apache/paimon/trino/TrinoTableOptionUtils.java | 3 --
.../apache/paimon/trino/catalog/TrinoCatalog.java | 61 +++++++++-------------
.../paimon/trino/TestTrinoPartitioningHandle.java | 4 +-
pom.xml | 2 +-
10 files changed, 55 insertions(+), 93 deletions(-)
diff --git
a/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoMetadata.java
b/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoMetadata.java
index 359bad3..8fac45c 100644
--- a/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoMetadata.java
+++ b/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoMetadata.java
@@ -93,8 +93,6 @@ import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
-import static org.apache.paimon.table.BucketMode.FIXED;
-import static org.apache.paimon.table.BucketMode.UNAWARE;
import static org.apache.paimon.trino.TrinoColumnHandle.TRINO_ROW_ID_NAME;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -124,19 +122,18 @@ public class TrinoMetadata implements ConnectorMetadata {
FileStoreTable storeTable = (FileStoreTable) table;
BucketMode bucketMode = storeTable.bucketMode();
switch (bucketMode) {
- case FIXED:
+ case HASH_FIXED:
try {
return Optional.of(
new ConnectorTableLayout(
new TrinoPartitioningHandle(
-
InstantiationUtil.serializeObject(storeTable.schema()),
- FIXED),
+
InstantiationUtil.serializeObject(storeTable.schema())),
storeTable.schema().bucketKeys(),
false));
} catch (IOException e) {
throw new RuntimeException(e);
}
- case UNAWARE:
+ case BUCKET_UNAWARE:
return Optional.empty();
default:
throw new IllegalArgumentException("Unknown table bucket mode:
" + bucketMode);
@@ -230,7 +227,7 @@ public class TrinoMetadata implements ConnectorMetadata {
}
FileStoreTable storeTable = (FileStoreTable) table;
BucketMode bucketMode = storeTable.bucketMode();
- if (bucketMode != FIXED) {
+ if (bucketMode != BucketMode.HASH_FIXED) {
throw new IllegalArgumentException("Unsupported table bucket mode:
" + bucketMode);
}
Set<String> pkSet = new HashSet<>(table.primaryKeys());
@@ -252,13 +249,13 @@ public class TrinoMetadata implements ConnectorMetadata {
}
FileStoreTable storeTable = (FileStoreTable) table;
BucketMode bucketMode = storeTable.bucketMode();
- if (bucketMode != FIXED) {
+ if (bucketMode != BucketMode.HASH_FIXED) {
throw new IllegalArgumentException("Unsupported table bucket mode:
" + bucketMode);
}
try {
return Optional.of(
new TrinoPartitioningHandle(
-
InstantiationUtil.serializeObject(storeTable.schema()), FIXED));
+
InstantiationUtil.serializeObject(storeTable.schema())));
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -282,7 +279,12 @@ public class TrinoMetadata implements ConnectorMetadata {
@Override
public boolean schemaExists(ConnectorSession session, String schemaName) {
catalog.initSession(session);
- return catalog.databaseExists(schemaName);
+ try {
+ catalog.getDatabase(schemaName);
+ return true;
+ } catch (Catalog.DatabaseNotExistException e) {
+ return false;
+ }
}
@Override
@@ -427,11 +429,14 @@ public class TrinoMetadata implements ConnectorMetadata {
SchemaTableName tableName,
Map<String, String> dynamicOptions) {
catalog.initSession(session);
- return catalog.tableExists(
- Identifier.create(tableName.getSchemaName(),
tableName.getTableName()))
- ? new TrinoTableHandle(
- tableName.getSchemaName(), tableName.getTableName(),
dynamicOptions)
- : null;
+ try {
+ catalog.getTable(
+ Identifier.create(tableName.getSchemaName(),
tableName.getTableName()));
+ return new TrinoTableHandle(
+ tableName.getSchemaName(), tableName.getTableName(),
dynamicOptions);
+ } catch (Catalog.TableNotExistException e) {
+ return null;
+ }
}
@Override
diff --git
a/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoNodePartitioningProvider.java
b/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoNodePartitioningProvider.java
index 0aed279..e7cf7eb 100644
---
a/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoNodePartitioningProvider.java
+++
b/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoNodePartitioningProvider.java
@@ -18,8 +18,6 @@
package org.apache.paimon.trino;
-import org.apache.paimon.table.BucketMode;
-
import com.google.inject.Inject;
import io.trino.spi.connector.BucketFunction;
import io.trino.spi.connector.ConnectorNodePartitioningProvider;
@@ -46,11 +44,7 @@ public class TrinoNodePartitioningProvider implements
ConnectorNodePartitioningP
// todo support dynamic bucket tables
TrinoPartitioningHandle trinoPartitioningHandle =
(TrinoPartitioningHandle) partitioningHandle;
- if (trinoPartitioningHandle.getBucketMode() == BucketMode.FIXED) {
- return new FixedBucketTableShuffleFunction(
- partitionChannelTypes, trinoPartitioningHandle,
workerCount);
- }
- throw new UnsupportedOperationException(
- "Unsupported table bucket mode: " +
trinoPartitioningHandle.getBucketMode());
+ return new FixedBucketTableShuffleFunction(
+ partitionChannelTypes, trinoPartitioningHandle, workerCount);
}
}
diff --git
a/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoPageSinkProvider.java
b/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoPageSinkProvider.java
index 1ee2601..af52d5f 100644
---
a/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoPageSinkProvider.java
+++
b/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoPageSinkProvider.java
@@ -92,18 +92,11 @@ public class TrinoPageSinkProvider implements
ConnectorPageSinkProvider {
BucketMode mode =
table instanceof FileStoreTable
? ((FileStoreTable) table).bucketMode()
- : BucketMode.FIXED;
+ : BucketMode.HASH_FIXED;
switch (mode) {
- case FIXED:
- case UNAWARE:
+ case HASH_FIXED:
+ case BUCKET_UNAWARE:
break;
- case DYNAMIC:
- case GLOBAL_DYNAMIC:
- if (table.primaryKeys().isEmpty()) {
- throw new IllegalArgumentException(
- "Only primary-key table can support dynamic
bucket.");
- }
- throw new IllegalArgumentException("Global dynamic bucket mode
are not supported");
default:
throw new IllegalArgumentException("Unknown bucket mode: " +
mode);
}
diff --git
a/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoPageSourceProvider.java
b/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoPageSourceProvider.java
index 41bb874..171dfd8 100644
---
a/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoPageSourceProvider.java
+++
b/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoPageSourceProvider.java
@@ -205,7 +205,7 @@ public class TrinoPageSourceProvider implements
ConnectorPageSourceProvider {
new Path(indexFile.path()),
((FileStoreTable)
table).fileIO(),
rowType)) {
- if
(!fileIndexPredicate.testPredicate(paimonFilter.get())) {
+ if
(!fileIndexPredicate.evaluate(paimonFilter.get()).remain()) {
continue;
}
}
diff --git
a/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoPartitioningHandle.java
b/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoPartitioningHandle.java
index 0da69d8..224adc0 100644
---
a/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoPartitioningHandle.java
+++
b/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoPartitioningHandle.java
@@ -19,7 +19,6 @@
package org.apache.paimon.trino;
import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.BucketMode;
import org.apache.paimon.utils.InstantiationUtil;
import com.fasterxml.jackson.annotation.JsonCreator;
@@ -33,14 +32,10 @@ import java.util.Arrays;
public class TrinoPartitioningHandle implements ConnectorPartitioningHandle {
private final byte[] schema;
- private final BucketMode bucketMode;
@JsonCreator
- public TrinoPartitioningHandle(
- @JsonProperty("schema") byte[] schema,
- @JsonProperty("bucketMode") BucketMode bucketMode) {
+ public TrinoPartitioningHandle(@JsonProperty("schema") byte[] schema) {
this.schema = schema;
- this.bucketMode = bucketMode;
}
@JsonProperty
@@ -48,17 +43,10 @@ public class TrinoPartitioningHandle implements
ConnectorPartitioningHandle {
return schema;
}
- @JsonProperty
- public BucketMode getBucketMode() {
- return bucketMode;
- }
-
public TableSchema getOriginalSchema() {
try {
return InstantiationUtil.deserializeObject(this.schema,
getClass().getClassLoader());
- } catch (IOException e) {
- throw new RuntimeException(e);
- } catch (ClassNotFoundException e) {
+ } catch (IOException | ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
diff --git
a/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoSplitManager.java
b/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoSplitManager.java
index 038e522..120b0e9 100644
---
a/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoSplitManager.java
+++
b/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoSplitManager.java
@@ -72,7 +72,7 @@ public class TrinoSplitManager implements
ConnectorSplitManager {
.convert(tableHandle.getFilter())
.ifPresent(readBuilder::withFilter);
tableHandle.getLimit().ifPresent(limit -> readBuilder.withLimit((int)
limit));
- List<Split> splits = readBuilder.newScan().plan().splits();
+ List<Split> splits = readBuilder.dropStats().newScan().plan().splits();
long maxRowCount =
splits.stream().mapToLong(Split::rowCount).max().orElse(0L);
double minimumSplitWeight =
TrinoSessionProperties.getMinimumSplitWeight(session);
diff --git
a/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoTableOptionUtils.java
b/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoTableOptionUtils.java
index c0b9344..3504fb5 100644
---
a/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoTableOptionUtils.java
+++
b/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoTableOptionUtils.java
@@ -93,7 +93,6 @@ public class TrinoTableOptionUtils {
private static boolean isEnum(String className) {
switch (className) {
- case "FileFormatType":
case "StartupMode":
case "MergeEngine":
case "ChangelogProducer":
@@ -108,8 +107,6 @@ public class TrinoTableOptionUtils {
private static Class<?> buildClass(String className) {
switch (className) {
- case "FileFormatType":
- return CoreOptions.FileFormatType.class;
case "MergeEngine":
return CoreOptions.MergeEngine.class;
case "ChangelogProducer":
diff --git
a/paimon-trino-440/src/main/java/org/apache/paimon/trino/catalog/TrinoCatalog.java
b/paimon-trino-440/src/main/java/org/apache/paimon/trino/catalog/TrinoCatalog.java
index 2caab17..aab9057 100644
---
a/paimon-trino-440/src/main/java/org/apache/paimon/trino/catalog/TrinoCatalog.java
+++
b/paimon-trino-440/src/main/java/org/apache/paimon/trino/catalog/TrinoCatalog.java
@@ -20,14 +20,10 @@
package org.apache.paimon.trino.catalog;
-import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.catalog.CatalogFactory;
-import org.apache.paimon.catalog.CatalogLockContext;
-import org.apache.paimon.catalog.CatalogLockFactory;
-import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.catalog.*;
import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.metastore.MetastoreClient;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
@@ -43,7 +39,6 @@ import org.apache.hadoop.conf.Configuration;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
/** Trino catalog, use it after set session. */
public class TrinoCatalog implements Catalog {
@@ -120,21 +115,11 @@ public class TrinoCatalog implements Catalog {
return current.fileIO();
}
- @Override
- public Optional<CatalogLockFactory> lockFactory() {
- return current.lockFactory();
- }
-
@Override
public List<String> listDatabases() {
return current.listDatabases();
}
- @Override
- public boolean databaseExists(String s) {
- return current.databaseExists(s);
- }
-
@Override
public void createDatabase(String s, boolean b, Map<String, String> map)
throws DatabaseAlreadyExistException {
@@ -142,8 +127,8 @@ public class TrinoCatalog implements Catalog {
}
@Override
- public Map<String, String> loadDatabaseProperties(String s) throws
DatabaseNotExistException {
- return current.loadDatabaseProperties(s);
+ public Database getDatabase(String name) throws DatabaseNotExistException {
+ return current.getDatabase(name);
}
@Override
@@ -157,6 +142,11 @@ public class TrinoCatalog implements Catalog {
return current.getTable(identifier);
}
+ @Override
+ public Path getTableLocation(Identifier identifier) {
+ return current.getTableLocation(identifier);
+ }
+
@Override
public List<String> listTables(String s) throws DatabaseNotExistException {
return current.listTables(s);
@@ -185,12 +175,24 @@ public class TrinoCatalog implements Catalog {
current.alterTable(identifier, list, ignoreIfExists);
}
+ @Override
+ public void createPartition(Identifier identifier, Map<String, String> map)
+ throws TableNotExistException {
+ current.createPartition(identifier, map);
+ }
+
@Override
public void dropPartition(Identifier identifier, Map<String, String>
partitions)
throws TableNotExistException, PartitionNotExistException {
current.dropPartition(identifier, partitions);
}
+ @Override
+ public List<PartitionEntry> listPartitions(Identifier identifier)
+ throws TableNotExistException {
+ return current.listPartitions(identifier);
+ }
+
@Override
public void close() throws Exception {
if (current != null) {
@@ -198,27 +200,12 @@ public class TrinoCatalog implements Catalog {
}
}
- @Override
- public Optional<CatalogLockContext> lockContext() {
- return current.lockContext();
- }
-
- @Override
- public Optional<MetastoreClient.Factory> metastoreClientFactory(Identifier
identifier) {
- return current.metastoreClientFactory(identifier);
- }
-
@Override
public void createDatabase(String name, boolean ignoreIfExists)
throws DatabaseAlreadyExistException {
current.createDatabase(name, ignoreIfExists);
}
- @Override
- public boolean tableExists(Identifier identifier) {
- return current.tableExists(identifier);
- }
-
@Override
public void alterTable(Identifier identifier, SchemaChange change, boolean
ignoreIfNotExists)
throws TableNotExistException, ColumnAlreadyExistException,
ColumnNotExistException {
@@ -226,7 +213,7 @@ public class TrinoCatalog implements Catalog {
}
@Override
- public boolean caseSensitive() {
- return current.caseSensitive();
+ public boolean allowUpperCase() {
+ return current.allowUpperCase();
}
}
diff --git
a/paimon-trino-440/src/test/java/org/apache/paimon/trino/TestTrinoPartitioningHandle.java
b/paimon-trino-440/src/test/java/org/apache/paimon/trino/TestTrinoPartitioningHandle.java
index d23a21e..7d97c7c 100644
---
a/paimon-trino-440/src/test/java/org/apache/paimon/trino/TestTrinoPartitioningHandle.java
+++
b/paimon-trino-440/src/test/java/org/apache/paimon/trino/TestTrinoPartitioningHandle.java
@@ -18,7 +18,6 @@
package org.apache.paimon.trino;
-import org.apache.paimon.table.BucketMode;
import org.apache.paimon.utils.InstantiationUtil;
import io.airlift.json.JsonCodec;
@@ -35,8 +34,7 @@ public class TestTrinoPartitioningHandle {
@Test
public void testTrinoPartitioningHandle() throws Exception {
byte[] schemaData = InstantiationUtil.serializeObject("test_schema");
- TrinoPartitioningHandle expected =
- new TrinoPartitioningHandle(schemaData, BucketMode.FIXED);
+ TrinoPartitioningHandle expected = new
TrinoPartitioningHandle(schemaData);
testRoundTrip(expected);
}
diff --git a/pom.xml b/pom.xml
index 77cdcb2..d137d6f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -56,7 +56,7 @@ under the License.
</scm>
<properties>
- <paimon.version>0.8.0</paimon.version>
+ <paimon.version>1.0-SNAPSHOT</paimon.version>
<target.java.version>11</target.java.version>
<junit5.version>5.8.1</junit5.version>
<slf4j.version>2.0.7</slf4j.version>