[ https://issues.apache.org/jira/browse/DRILL-8027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17442797#comment-17442797 ]
ASF GitHub Bot commented on DRILL-8027: --------------------------------------- vvysotskyi commented on a change in pull request #2357: URL: https://github.com/apache/drill/pull/2357#discussion_r748352763 ########## File path: exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/ExprToRex.java ########## @@ -62,31 +61,25 @@ public static RelDataTypeField findField(String fieldName, RelDataType rowType) return null; } - private RexNode makeItemOperator(String[] paths, int index, RelDataType rowType) { - if (index == 0) { //last one, return ITEM([0]-inputRef, [1] Literal) - final RelDataTypeField field = findField(paths[0], rowType); - return field == null ? null : builder.makeInputRef(field.getType(), field.getIndex()); - } - return builder.makeCall(SqlStdOperatorTable.ITEM, - makeItemOperator(paths, index - 1, rowType), - builder.makeLiteral(paths[index])); - } - @Override public RexNode visitSchemaPath(SchemaPath path, Void value) throws RuntimeException { - PathSegment.NameSegment rootSegment = path.getRootSegment(); - if (rootSegment.isLastPath()) { - final RelDataTypeField field = findField(rootSegment.getPath(), newRowType); - return field == null ? null : builder.makeInputRef(field.getType(), field.getIndex()); - } - List<String> paths = Lists.newArrayList(); - while (rootSegment != null) { - paths.add(rootSegment.getPath()); - rootSegment = (PathSegment.NameSegment) rootSegment.getChild(); + PathSegment pathSegment = path.getRootSegment(); + + RelDataTypeField field = findField(pathSegment.getNameSegment().getPath(), newRowType); + RexNode rexNode = field == null ? null : builder.makeInputRef(field.getType(), field.getIndex()); + while (!pathSegment.isLastPath()) { + pathSegment = pathSegment.getChild(); + RexNode ref; + if (pathSegment.isNamed()) { + ref = builder.makeLiteral(pathSegment.getNameSegment().getPath()); + } else { + ref = builder.makeBigintLiteral(BigDecimal.valueOf(pathSegment.getArraySegment().getIndex())); Review comment: Calcite API requires BigDecimal ########## File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java ########## @@ -56,7 +59,23 @@ AbstractWriter getWriter(PhysicalOperator child, String location, List<String> partitionColumns) throws IOException; - Set<StoragePluginOptimizerRule> getOptimizerRules(); + @Deprecated + default Set<? extends RelOptRule> getOptimizerRules() { + return Collections.emptySet(); + } + + default Set<? extends RelOptRule> getOptimizerRules(PlannerPhase phase) { + switch (phase) { + case PHYSICAL: + return getOptimizerRules(); + case LOGICAL: + case JOIN_PLANNING:case LOGICAL_PRUNE_AND_JOIN: Review comment: Thanks, fixed. ########## File path: contrib/format-iceberg/src/main/java/org/apache/drill/exec/store/iceberg/format/IcebergFormatPlugin.java ########## @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.iceberg.format; + +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.RelOptRule; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.metastore.MetadataProviderManager; +import org.apache.drill.exec.physical.base.AbstractGroupScan; +import org.apache.drill.exec.physical.base.AbstractWriter; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.planner.PlannerPhase; +import org.apache.drill.exec.planner.common.DrillStatsTable; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.exec.record.metadata.schema.SchemaProvider; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.store.PluginRulesProviderImpl; +import org.apache.drill.exec.store.StoragePluginRulesSupplier; +import org.apache.drill.exec.store.dfs.FileSelection; +import org.apache.drill.exec.store.dfs.FileSystemConfig; +import org.apache.drill.exec.store.dfs.FormatMatcher; +import org.apache.drill.exec.store.dfs.FormatPlugin; +import org.apache.drill.exec.store.iceberg.IcebergGroupScan; +import org.apache.drill.exec.store.iceberg.plan.IcebergPluginImplementor; +import org.apache.drill.exec.store.plan.rel.PluginRel; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +public class IcebergFormatPlugin implements FormatPlugin { + + private static final String ICEBERG_CONVENTION_PREFIX = "ICEBERG."; + + private final FileSystemConfig storageConfig; + + private final IcebergFormatPluginConfig config; + + private final Configuration fsConf; + + private final DrillbitContext context; + + private final String name; + + private final IcebergFormatMatcher matcher; + + private final StoragePluginRulesSupplier storagePluginRulesSupplier; + + public IcebergFormatPlugin( + String name, + DrillbitContext context, + Configuration fsConf, + FileSystemConfig storageConfig, + IcebergFormatPluginConfig config) { + this.storageConfig = storageConfig; + this.config = config; + this.fsConf = fsConf; + this.context = context; + this.name = name; + this.matcher = new IcebergFormatMatcher(this); + this.storagePluginRulesSupplier = storagePluginRulesSupplier(name); + } + + private static StoragePluginRulesSupplier storagePluginRulesSupplier(String name) { + Convention convention = new Convention.Impl(ICEBERG_CONVENTION_PREFIX + name, PluginRel.class); + return StoragePluginRulesSupplier.builder() + .rulesProvider(new PluginRulesProviderImpl(convention, IcebergPluginImplementor::new)) + .supportsFilterPushdown(true) + .supportsProjectPushdown(true) + .supportsLimitPushdown(true) + .convention(convention) + .build(); + } + + @Override + public boolean supportsRead() { + return true; + } + + @Override + public boolean supportsWrite() { + return false; + } + + @Override + public boolean supportsAutoPartitioning() { + return false; + } + + @Override + public FormatMatcher getMatcher() { + return matcher; + } + + @Override + public AbstractWriter getWriter(PhysicalOperator child, String location, List<String> partitionColumns) { + throw new UnsupportedOperationException(); + } + + @Override + public Set<? extends RelOptRule> getOptimizerRules(PlannerPhase phase) { + switch (phase) { + case PHYSICAL: + case LOGICAL: + return storagePluginRulesSupplier.getOptimizerRules(); + case LOGICAL_PRUNE_AND_JOIN: + case LOGICAL_PRUNE: + case PARTITION_PRUNING: + case JOIN_PLANNING: + default: + return Collections.emptySet(); + } + } + + @Override + public AbstractGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns) { + return IcebergGroupScan.builder() Review comment: Yep, using the builder looks better when we have a lot of arguments or some of them are nulls ########## File path: contrib/format-iceberg/README.md ########## @@ -0,0 +1,117 @@ +# Apache Iceberg format plugin + +This format plugin enabled Drill to query Apache Iceberg tables. + +Unlike regular format plugins, the Iceberg table is a folder with data and metadata files, but Drill checks the presence +of the `metadata` folder to ensure that the table is Iceberg one. + +Drill supports reading all formats of Iceberg tables available at this moment: Parquet, Avro, and ORC. +No need to provide actual table format, it will be discovered automatically. + +For details related to Apache Iceberg table format, please refer to [official docs](https://iceberg.apache.org/#). + +## Supported optimizations and features + +### Project pushdown + +This format plugin supports project and filter pushdown optimizations. + +For the case of project pushdown, only specified in the query columns will be read, even if it is a nested column. In +conjunction with column-oriented formats like Parquet or ORC, it allows improving reading performance significantly. + +### Filter pushdown + +For the case of filter pushdown, all expressions supported by Iceberg API will be pushed down, so only data that matches +the filter expression will be read. + +### Schema provisioning + +This format plugin supports the schema provisioning feature. Though Iceberg provides table schema, in some cases, it +might be useful to select data with customized schema, so it can be done using the table function: + +```sql +SELECT int_field, + string_field +FROM table(dfs.tmp.testAllTypes(schema => 'inline=(int_field varchar not null default `error`)')) +``` + +In this example, we convert int field to string and return `'error'` literals for null values. + +### Querying table metadata + +Apache Drill provides the ability to query any kind of table metadata Iceberg can return. + +At this point, Apache Iceberg has the following metadata kinds: + +* ENTRIES +* FILES +* HISTORY +* SNAPSHOTS +* MANIFESTS +* PARTITIONS +* ALL_DATA_FILES +* ALL_MANIFESTS +* ALL_ENTRIES + +To query specific metadata, just add the `#metadata_name` suffix to the table location, like in the following example: + +```sql +SELECT * +FROM dfs.tmp.`testAllTypes#snapshots` +``` + +### Querying specific table versions (snapshots) + +Apache Icebergs has the ability to track the table modifications and read specific version before or after modifications +or modifications itself. + +This storage plugin embraces this ability and provides an easy-to-use way of triggering it. Review comment: Thanks! ########## File path: contrib/format-iceberg/src/main/java/org/apache/drill/exec/store/iceberg/IcebergWork.java ########## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.iceberg; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import com.fasterxml.jackson.databind.deser.std.StdDeserializer; +import com.fasterxml.jackson.databind.ser.std.StdSerializer; +import lombok.Value; +import lombok.extern.slf4j.Slf4j; +import org.apache.iceberg.CombinedScanTask; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.Base64; + +@Value +@JsonSerialize(using = IcebergWork.IcebergWorkSerializer.class) +@JsonDeserialize(using = IcebergWork.IcebergWorkDeserializer.class) +public class IcebergWork { + CombinedScanTask scanTask; + + /** + * Special deserializer for {@link IcebergWork} class that deserializes + * {@code scanTask} filed from byte array string created using {@link java.io.Serializable}. + */ + @Slf4j + public static class IcebergWorkDeserializer extends StdDeserializer<IcebergWork> { + + public IcebergWorkDeserializer() { + super(IcebergWork.class); + } + + @Override + public IcebergWork deserialize(JsonParser p, DeserializationContext ctxt) throws IOException { + JsonNode node = p.getCodec().readTree(p); + String scanTaskString = node.get(IcebergWorkSerializer.SCAN_TASK_FIELD).asText(); + try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(Base64.getDecoder().decode(scanTaskString)))) { + Object scanTask = ois.readObject(); + return new IcebergWork((CombinedScanTask) scanTask); + } catch (ClassNotFoundException e) { + logger.error(e.getMessage(), e); + } + + return null; + } + } + + /** + * Special serializer for {@link IcebergWork} class that serializes + * {@code scanTask} filed to byte array string created using {@link java.io.Serializable} Review comment: Thanks, fixed ########## File path: contrib/format-iceberg/README.md ########## @@ -0,0 +1,99 @@ +# Apache Iceberg format plugin + +This format plugin enabled Drill to query Apache Iceberg tables. + +Unlike regular format plugins, the Iceberg table is a folder with data and metadata files, but Drill checks the presence +of the `metadata` folder to ensure that the table is Iceberg one. + +Drill supports reading all formats of Iceberg tables available at this moment: Parquet, Avro, and ORC. +No need to provide actual table format, it will be discovered automatically. + +For details related to Apache Iceberg table format, please refer to [official docs](https://iceberg.apache.org/#). + +## Supported optimizations and features + +### Project pushdown + +This format plugin supports project and filter pushdown optimizations. + +For the case of project pushdown, only specified in the query columns will be read, even if it is a nested column. In +conjunction with column-oriented formats like Parquet or ORC, it allows improving reading performance significantly. + +### Filter pushdown + +For the case of filter pushdown, all expressions supported by Iceberg API will be pushed down, so only data that matches +the filter expression will be read. + +### Schema provisioning + +This format plugin supports the schema provisioning feature. Though Iceberg provides table schema, in some cases, it +might be useful to select data with customized schema, so it can be done using the table function: + +```sql +SELECT int_field, + string_field +FROM table(dfs.tmp.testAllTypes(schema => 'inline=(int_field varchar not null default `error`)')) +``` + +In this example, we convert int field to string and return `'error'` literals for null values. + +### Querying table metadata + +Apache Drill provides the ability to query any kind of table metadata Iceberg can return. + +At this point, Apache Iceberg has the following metadata kinds: + +* ENTRIES +* FILES +* HISTORY +* SNAPSHOTS +* MANIFESTS +* PARTITIONS +* ALL_DATA_FILES +* ALL_MANIFESTS +* ALL_ENTRIES + +To query specific metadata, just add the `#metadata_name` suffix to the table location, like in the following example: + +```sql +SELECT * +FROM dfs.tmp.`testAllTypes#snapshots` +``` + +### Querying specific table versions (snapshots) + +Apache Icebergs has the ability to track the table modifications and read specific version before or after modifications +or modifications itself. + +This storage plugin embraces this ability and provides an easy-to-use way of triggering it. + +The following ways of specifying table version are supported: + +- `snapshotId` - id of the specific snapshot +- `snapshotAsOfTime` - the most recent snapshot as of the given time in milliseconds +- `fromSnapshotId` - read appended data from `fromSnapshotId` exclusive to the current snapshot inclusive +- \[`fromSnapshotId` : `toSnapshotId`\] - read appended data from `fromSnapshotId` exclusive to `toSnapshotId` inclusive + +Table function can be used to specify one of the above configs in the following way: + Review comment: Thanks! ########## File path: contrib/format-iceberg/README.md ########## @@ -0,0 +1,99 @@ +# Apache Iceberg format plugin + +This format plugin enabled Drill to query Apache Iceberg tables. + +Unlike regular format plugins, the Iceberg table is a folder with data and metadata files, but Drill checks the presence +of the `metadata` folder to ensure that the table is Iceberg one. + +Drill supports reading all formats of Iceberg tables available at this moment: Parquet, Avro, and ORC. +No need to provide actual table format, it will be discovered automatically. + +For details related to Apache Iceberg table format, please refer to [official docs](https://iceberg.apache.org/#). + +## Supported optimizations and features + +### Project pushdown + +This format plugin supports project and filter pushdown optimizations. + +For the case of project pushdown, only specified in the query columns will be read, even if it is a nested column. In +conjunction with column-oriented formats like Parquet or ORC, it allows improving reading performance significantly. + +### Filter pushdown + +For the case of filter pushdown, all expressions supported by Iceberg API will be pushed down, so only data that matches +the filter expression will be read. + +### Schema provisioning + +This format plugin supports the schema provisioning feature. Though Iceberg provides table schema, in some cases, it +might be useful to select data with customized schema, so it can be done using the table function: + +```sql +SELECT int_field, + string_field +FROM table(dfs.tmp.testAllTypes(schema => 'inline=(int_field varchar not null default `error`)')) +``` + +In this example, we convert int field to string and return `'error'` literals for null values. + +### Querying table metadata + +Apache Drill provides the ability to query any kind of table metadata Iceberg can return. + +At this point, Apache Iceberg has the following metadata kinds: + +* ENTRIES +* FILES +* HISTORY +* SNAPSHOTS +* MANIFESTS +* PARTITIONS +* ALL_DATA_FILES +* ALL_MANIFESTS +* ALL_ENTRIES + +To query specific metadata, just add the `#metadata_name` suffix to the table location, like in the following example: + +```sql +SELECT * +FROM dfs.tmp.`testAllTypes#snapshots` +``` + +### Querying specific table versions (snapshots) + +Apache Icebergs has the ability to track the table modifications and read specific version before or after modifications Review comment: Good catch ########## File path: contrib/format-iceberg/README.md ########## @@ -0,0 +1,117 @@ +# Apache Iceberg format plugin + +This format plugin enabled Drill to query Apache Iceberg tables. + +Unlike regular format plugins, the Iceberg table is a folder with data and metadata files, but Drill checks the presence +of the `metadata` folder to ensure that the table is Iceberg one. + +Drill supports reading all formats of Iceberg tables available at this moment: Parquet, Avro, and ORC. +No need to provide actual table format, it will be discovered automatically. + +For details related to Apache Iceberg table format, please refer to [official docs](https://iceberg.apache.org/#). + +## Supported optimizations and features + +### Project pushdown + +This format plugin supports project and filter pushdown optimizations. + +For the case of project pushdown, only specified in the query columns will be read, even if it is a nested column. In Review comment: Thanks, corrected ########## File path: contrib/format-iceberg/src/main/java/org/apache/drill/exec/store/iceberg/IcebergWork.java ########## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.iceberg; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import com.fasterxml.jackson.databind.deser.std.StdDeserializer; +import com.fasterxml.jackson.databind.ser.std.StdSerializer; +import lombok.Value; +import lombok.extern.slf4j.Slf4j; +import org.apache.iceberg.CombinedScanTask; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.Base64; + +@Value +@JsonSerialize(using = IcebergWork.IcebergWorkSerializer.class) +@JsonDeserialize(using = IcebergWork.IcebergWorkDeserializer.class) +public class IcebergWork { + CombinedScanTask scanTask; + + /** + * Special deserializer for {@link IcebergWork} class that deserializes + * {@code scanTask} filed from byte array string created using {@link java.io.Serializable}. Review comment: Thanks, fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@drill.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Format plugin for Apache Iceberg > -------------------------------- > > Key: DRILL-8027 > URL: https://issues.apache.org/jira/browse/DRILL-8027 > Project: Apache Drill > Issue Type: New Feature > Affects Versions: 1.20.0 > Reporter: Vova Vysotskyi > Assignee: Vova Vysotskyi > Priority: Major > Labels: plugin > Fix For: 1.20.0 > > > Implement a format plugin for Apache Iceberg. > Plugin should be able to: > - support reading data from Iceberg tables in Parquet, Avro, and ORC formats > - push down fields used in the project > - push down supported filter expressions > - spit and parallelize reading tasks > - provide a way for specifying Iceberg-specific configurations > - read specific snapshot versions if configured > - read table metadata (entries, files, history, snapshots, manifests, > partitions, etc.) > - support schema provisioning -- This message was sent by Atlassian Jira (v8.20.1#820001)