[ https://issues.apache.org/jira/browse/DRILL-8353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17634334#comment-17634334 ]
ASF GitHub Bot commented on DRILL-8353: --------------------------------------- jnturton commented on code in PR #2702: URL: https://github.com/apache/drill/pull/2702#discussion_r1017953161 ########## contrib/format-deltalake/README.md: ########## @@ -0,0 +1,36 @@ +# Delta Lake format plugin + +This format plugin enabled Drill to query Delta Lake tables. Review Comment: ```suggestion This format plugin enables Drill to query Delta Lake tables. ``` ########## contrib/format-deltalake/README.md: ########## @@ -0,0 +1,36 @@ +# Delta Lake format plugin + +This format plugin enabled Drill to query Delta Lake tables. + +## Supported optimizations and features + +### Project pushdown + +This format plugin supports project and filter pushdown optimizations. + +For the case of project pushdown, only columns specified in the query will be read, even they are nested columns. Review Comment: ```suggestion For the case of project pushdown, only columns specified in the query will be read, even when they are nested columns. ``` ########## contrib/format-deltalake/src/test/java/org/apache/drill/exec/store/delta/DeltaQueriesTest.java: ########## @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.delta; + +import org.apache.drill.common.logical.FormatPluginConfig; +import org.apache.drill.common.logical.security.PlainCredentialsProvider; +import org.apache.drill.exec.store.StoragePluginRegistry; +import org.apache.drill.exec.store.delta.format.DeltaFormatPluginConfig; +import org.apache.drill.exec.store.dfs.FileSystemConfig; +import org.apache.drill.test.ClusterFixture; +import org.apache.drill.test.ClusterTest; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.math.BigDecimal; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.drill.exec.util.StoragePluginTestUtils.DFS_PLUGIN_NAME; +import static org.junit.Assert.assertEquals; + +public class DeltaQueriesTest extends ClusterTest { + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + startCluster(ClusterFixture.builder(dirTestWatcher)); + + StoragePluginRegistry pluginRegistry = cluster.drillbit().getContext().getStorage(); + FileSystemConfig pluginConfig = (FileSystemConfig) pluginRegistry.getPlugin(DFS_PLUGIN_NAME).getConfig(); + Map<String, FormatPluginConfig> formats = new HashMap<>(pluginConfig.getFormats()); + formats.put("delta", new DeltaFormatPluginConfig()); + FileSystemConfig newPluginConfig = new FileSystemConfig( + pluginConfig.getConnection(), + pluginConfig.getConfig(), + pluginConfig.getWorkspaces(), + formats, + PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER); + newPluginConfig.setEnabled(pluginConfig.isEnabled()); + pluginRegistry.put(DFS_PLUGIN_NAME, newPluginConfig); + + dirTestWatcher.copyResourceToRoot(Paths.get("data-reader-primitives")); + dirTestWatcher.copyResourceToRoot(Paths.get("data-reader-partition-values")); + dirTestWatcher.copyResourceToRoot(Paths.get("data-reader-nested-struct")); + } + + @Test + public void testSerDe() throws Exception { + String plan = queryBuilder().sql("select * from dfs.`data-reader-partition-values`").explainJson(); + long count = queryBuilder().physical(plan).run().recordCount(); + assertEquals(3, count); + } + + @Test + public void testAllPrimitives() throws Exception { + testBuilder() + .sqlQuery("select * from dfs.`data-reader-primitives`") + .ordered() + .baselineColumns("as_int", "as_long", "as_byte", "as_short", "as_boolean", "as_float", + "as_double", "as_string", "as_binary", "as_big_decimal") + .baselineValues(null, null, null, null, null, null, null, null, null, null) + .baselineValues(0, 0L, 0, 0, true, 0.0f, 0.0, "0", new byte[]{0, 0}, BigDecimal.valueOf(0)) + .baselineValues(1, 1L, 1, 1, false, 1.0f, 1.0, "1", new byte[]{1, 1}, BigDecimal.valueOf(1)) + .baselineValues(2, 2L, 2, 2, true, 2.0f, 2.0, "2", new byte[]{2, 2}, BigDecimal.valueOf(2)) + .baselineValues(3, 3L, 3, 3, false, 3.0f, 3.0, "3", new byte[]{3, 3}, BigDecimal.valueOf(3)) + .baselineValues(4, 4L, 4, 4, true, 4.0f, 4.0, "4", new byte[]{4, 4}, BigDecimal.valueOf(4)) + .baselineValues(5, 5L, 5, 5, false, 5.0f, 5.0, "5", new byte[]{5, 5}, BigDecimal.valueOf(5)) + .baselineValues(6, 6L, 6, 6, true, 6.0f, 6.0, "6", new byte[]{6, 6}, BigDecimal.valueOf(6)) + .baselineValues(7, 7L, 7, 7, false, 7.0f, 7.0, "7", new byte[]{7, 7}, BigDecimal.valueOf(7)) + .baselineValues(8, 8L, 8, 8, true, 8.0f, 8.0, "8", new byte[]{8, 8}, BigDecimal.valueOf(8)) + .baselineValues(9, 9L, 9, 9, false, 9.0f, 9.0, "9", new byte[]{9, 9}, BigDecimal.valueOf(9)) + .go(); + } + + @Test + public void testProjectingColumns() throws Exception { + + String query = "select as_int, as_string from dfs.`data-reader-primitives`"; + + queryBuilder() + .sql(query) + .planMatcher() + .include("columns=\\[`as_int`, `as_string`\\]") + .match(); + + testBuilder() + .sqlQuery(query) + .unOrdered() + .baselineColumns("as_int", "as_string") + .baselineValues(null, null) + .baselineValues(0, "0") + .baselineValues(1, "1") + .baselineValues(2, "2") + .baselineValues(3, "3") + .baselineValues(4, "4") + .baselineValues(5, "5") + .baselineValues(6, "6") + .baselineValues(7, "7") + .baselineValues(8, "8") + .baselineValues(9, "9") + .go(); + } + + @Test + public void testProjectNestedColumn() throws Exception { + String query = "select t.a.ac.acb as acb, b from dfs.`data-reader-nested-struct` t"; + + queryBuilder() + .sql(query) + .planMatcher() + .include("columns=\\[`a`.`ac`.`acb`, `b`\\]") + .match(); + + testBuilder() + .sqlQuery(query) + .unOrdered() + .baselineColumns("acb", "b") + .baselineValues(0L, 0) + .baselineValues(1L, 1) + .baselineValues(2L, 2) + .baselineValues(3L, 3) + .baselineValues(4L, 4) + .baselineValues(5L, 5) + .baselineValues(6L, 6) + .baselineValues(7L, 7) + .baselineValues(8L, 8) + .baselineValues(9L, 9) + .go(); + } + + @Test + public void testPartitionPruning() throws Exception { + String query = "select as_int, as_string from dfs.`data-reader-partition-values` where as_long = 1"; + + queryBuilder() + .sql(query) + .planMatcher() + .include("numFiles\\=1") + .match(); + + testBuilder() + .sqlQuery(query) + .ordered() + .baselineColumns("as_int", "as_string") + .baselineValues("1", "1") + .go(); + } + + @Test + public void testEmptyResults() throws Exception { + String query = "select as_int, as_string from dfs.`data-reader-partition-values` where as_long = 101"; + + queryBuilder() + .sql(query) + .planMatcher() + .include("numFiles\\=1") + .match(); + + testBuilder() + .sqlQuery(query) + .ordered() + .expectsEmptyResultSet() + .go(); + } + + @Test + public void testLimit() throws Exception { + String query = "select as_int, as_string from dfs.`data-reader-partition-values` limit 1"; + + queryBuilder() + .sql(query) + .planMatcher() + .include("Limit\\(fetch\\=\\[1\\]\\)") Review Comment: ```suggestion // Note that both of the following two limits are expected because this format plugin supports an "artificial" limit. .include("Limit\\(fetch\\=\\[1\\]\\)") ``` ########## contrib/format-deltalake/README.md: ########## @@ -0,0 +1,36 @@ +# Delta Lake format plugin + +This format plugin enabled Drill to query Delta Lake tables. + +## Supported optimizations and features + +### Project pushdown + +This format plugin supports project and filter pushdown optimizations. + +For the case of project pushdown, only columns specified in the query will be read, even they are nested columns. + +### Filter pushdown + +For the case of filter pushdown, all expressions supported by Delta Lake API will be pushed down, so only data that +matches the filter expression will be read. Additionally, filtering logic for parquet files is enabled +to allow pruning of parquet files that do not match the filter expression. + +## Configuration + +Format plugin has the following configuration options: Review Comment: ```suggestion The format plugin has the following configuration options: ``` ########## contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/read/DeltaScanBatchCreator.java: ########## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.delta.read; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.ops.ExecutorFragmentContext; +import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.physical.impl.BatchCreator; +import org.apache.drill.exec.record.CloseableRecordBatch; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.server.options.OptionManager; +import org.apache.drill.exec.store.ColumnExplorer; +import org.apache.drill.exec.store.delta.DeltaRowGroupScan; +import org.apache.drill.exec.store.dfs.DrillFileSystem; +import org.apache.drill.exec.store.parquet.AbstractParquetRowGroupScan; +import org.apache.drill.exec.store.parquet.AbstractParquetScanBatchCreator; +import org.apache.drill.exec.store.parquet.ParquetScanBatchCreator; +import org.apache.drill.exec.store.parquet.RowGroupReadEntry; +import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; + +import java.util.List; +import java.util.Map; + +@SuppressWarnings("unused") +public class DeltaScanBatchCreator extends AbstractParquetScanBatchCreator Review Comment: So because you've inherited from AbstractParquetScanBatchCreator here, the various options that control the Parquet reader like `store.parquet.use_new_reader` will be applied for Delta Lake tables? ########## contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/format/DeltaFormatPlugin.java: ########## @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.delta.format; + +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.RelOptRule; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.metastore.MetadataProviderManager; +import org.apache.drill.exec.physical.base.AbstractGroupScan; +import org.apache.drill.exec.physical.base.AbstractWriter; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.planner.PlannerPhase; +import org.apache.drill.exec.planner.common.DrillStatsTable; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.exec.record.metadata.schema.SchemaProvider; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.store.PluginRulesProviderImpl; +import org.apache.drill.exec.store.StoragePluginRulesSupplier; +import org.apache.drill.exec.store.dfs.FileSelection; +import org.apache.drill.exec.store.dfs.FileSystemConfig; +import org.apache.drill.exec.store.dfs.FormatMatcher; +import org.apache.drill.exec.store.dfs.FormatPlugin; +import org.apache.drill.exec.store.delta.DeltaGroupScan; +import org.apache.drill.exec.store.delta.plan.DeltaPluginImplementor; +import org.apache.drill.exec.store.parquet.ParquetReaderConfig; +import org.apache.drill.exec.store.plan.rel.PluginRel; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +public class DeltaFormatPlugin implements FormatPlugin { + + private static final String DELTA_CONVENTION_PREFIX = "DELTA."; + + /** + * Generator for format id values. Formats with the same name may be defined + * in multiple storage plugins, so using the unique id within the convention name + * to ensure the rule names will be unique for different plugin instances. + */ + private static final AtomicInteger NEXT_ID = new AtomicInteger(0); + + private final FileSystemConfig storageConfig; + + private final DeltaFormatPluginConfig config; + + private final Configuration fsConf; + + private final DrillbitContext context; + + private final String name; + + private final DeltaFormatMatcher matcher; + + private final StoragePluginRulesSupplier storagePluginRulesSupplier; + + public DeltaFormatPlugin( + String name, + DrillbitContext context, + Configuration fsConf, + FileSystemConfig storageConfig, + DeltaFormatPluginConfig config) { + this.storageConfig = storageConfig; + this.config = config; + this.fsConf = fsConf; + this.context = context; + this.name = name; + this.matcher = new DeltaFormatMatcher(this); + this.storagePluginRulesSupplier = storagePluginRulesSupplier(name + NEXT_ID.getAndIncrement()); + } + + private static StoragePluginRulesSupplier storagePluginRulesSupplier(String name) { + Convention convention = new Convention.Impl(DELTA_CONVENTION_PREFIX + name, PluginRel.class); + return StoragePluginRulesSupplier.builder() + .rulesProvider(new PluginRulesProviderImpl(convention, DeltaPluginImplementor::new)) + .supportsFilterPushdown(true) + .supportsProjectPushdown(true) + .supportsLimitPushdown(true) + .convention(convention) + .build(); + } + + @Override + public boolean supportsRead() { + return true; + } + + @Override + public boolean supportsWrite() { + return false; + } + + @Override + public boolean supportsAutoPartitioning() { + return false; + } + + @Override + public FormatMatcher getMatcher() { + return matcher; + } + + @Override + public AbstractWriter getWriter(PhysicalOperator child, String location, List<String> partitionColumns) { + throw new UnsupportedOperationException(); + } + + @Override + public Set<? extends RelOptRule> getOptimizerRules(PlannerPhase phase) { + switch (phase) { + case PHYSICAL: + case LOGICAL: + return storagePluginRulesSupplier.getOptimizerRules(); + case LOGICAL_PRUNE_AND_JOIN: + case LOGICAL_PRUNE: + case PARTITION_PRUNING: + case JOIN_PLANNING: + default: + return Collections.emptySet(); + } + } + + @Override + public AbstractGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns) throws IOException { + return DeltaGroupScan.builder() + .userName(userName) + .formatPlugin(this) + .readerConfig(ParquetReaderConfig.builder().withConf(fsConf).build()) + .path(selection.selectionRoot.toUri().getPath()) + .columns(columns) + .limit(-1) + .build(); + } + + @Override + public AbstractGroupScan getGroupScan(String userName, FileSelection selection, + List<SchemaPath> columns, MetadataProviderManager metadataProviderManager) throws IOException { + SchemaProvider schemaProvider = metadataProviderManager.getSchemaProvider(); + TupleMetadata schema = schemaProvider != null + ? schemaProvider.read().getSchema() + : null; + return DeltaGroupScan.builder() + .userName(userName) + .formatPlugin(this) + .readerConfig(ParquetReaderConfig.builder().withConf(fsConf).build()) + .schema(schema) + .path(selection.selectionRoot.toUri().getPath()) + .columns(columns) + .limit(-1) + .build(); + } + + @Override + public boolean supportsStatistics() { + return false; + } + + @Override + public DrillStatsTable.TableStatistics readStatistics(FileSystem fs, Path statsTablePath) { + throw new UnsupportedOperationException("unimplemented"); Review Comment: Could this in principle be implemented for Delta Lake? ########## contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/plan/DrillExprToDeltaTranslator.java: ########## @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.delta.plan; + +import io.delta.standalone.expressions.And; +import io.delta.standalone.expressions.EqualTo; +import io.delta.standalone.expressions.Expression; +import io.delta.standalone.expressions.GreaterThan; +import io.delta.standalone.expressions.GreaterThanOrEqual; +import io.delta.standalone.expressions.IsNotNull; +import io.delta.standalone.expressions.IsNull; +import io.delta.standalone.expressions.LessThan; +import io.delta.standalone.expressions.LessThanOrEqual; +import io.delta.standalone.expressions.Literal; +import io.delta.standalone.expressions.Not; +import io.delta.standalone.expressions.Or; +import io.delta.standalone.expressions.Predicate; +import io.delta.standalone.types.StructType; +import org.apache.drill.common.FunctionNames; +import org.apache.drill.common.expression.FunctionCall; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.expression.PathSegment; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.expression.ValueExpressions; +import org.apache.drill.common.expression.visitors.AbstractExprVisitor; + +public class DrillExprToDeltaTranslator extends AbstractExprVisitor<Expression, Void, RuntimeException> { + + private final StructType structType; + + public DrillExprToDeltaTranslator(StructType structType) { + this.structType = structType; + } + + @Override + public Expression visitFunctionCall(FunctionCall call, Void value) { + try { + return visitFunctionCall(call); + } catch (Exception e) { + return null; + } + } + + private Predicate visitFunctionCall(FunctionCall call) { + switch (call.getName()) { + case FunctionNames.AND: { + Expression left = call.arg(0).accept(this, null); + Expression right = call.arg(1).accept(this, null); + if (left != null && right != null) { + return new And(left, right); + } + return null; + } + case FunctionNames.OR: { + Expression left = call.arg(0).accept(this, null); + Expression right = call.arg(1).accept(this, null); + if (left != null && right != null) { + return new Or(left, right); + } + return null; + } + case FunctionNames.NOT: { + Expression expression = call.arg(0).accept(this, null); + if (expression != null) { + return new Not(expression); + } + return null; + } + case FunctionNames.IS_NULL: { + LogicalExpression arg = call.arg(0); + if (arg instanceof SchemaPath) { + String name = getPath((SchemaPath) arg); + return new IsNull(structType.column(name)); + } + return null; + } + case FunctionNames.IS_NOT_NULL: { + LogicalExpression arg = call.arg(0); + if (arg instanceof SchemaPath) { + String name = getPath((SchemaPath) arg); + return new IsNotNull(structType.column(name)); + } + return null; + } + case FunctionNames.LT: { + LogicalExpression nameRef = call.arg(0); + Expression expression = call.arg(1).accept(this, null); + if (nameRef instanceof SchemaPath) { + String name = getPath((SchemaPath) nameRef); + return new LessThan(structType.column(name), expression); + } + return null; + } + case FunctionNames.LE: { + LogicalExpression nameRef = call.arg(0); + Expression expression = call.arg(1).accept(this, null); + if (nameRef instanceof SchemaPath) { + String name = getPath((SchemaPath) nameRef); + return new LessThanOrEqual(structType.column(name), expression); + } + return null; + } + case FunctionNames.GT: { + LogicalExpression nameRef = call.args().get(0); + Expression expression = call.args().get(1).accept(this, null); + if (nameRef instanceof SchemaPath) { + String name = getPath((SchemaPath) nameRef); + return new GreaterThan(structType.column(name), expression); + } + return null; + } + case FunctionNames.GE: { + LogicalExpression nameRef = call.args().get(0); + Expression expression = call.args().get(0).accept(this, null); + if (nameRef instanceof SchemaPath) { + String name = getPath((SchemaPath) nameRef); + return new GreaterThanOrEqual(structType.column(name), expression); + } + return null; + } + case FunctionNames.EQ: { + LogicalExpression nameRef = call.args().get(0); + Expression expression = call.args().get(1).accept(this, null); + if (nameRef instanceof SchemaPath) { + String name = getPath((SchemaPath) nameRef); + return new EqualTo(structType.column(name), expression); + } + return null; + } + case FunctionNames.NE: { + LogicalExpression nameRef = call.args().get(0); + Expression expression = call.args().get(1).accept(this, null); + if (nameRef instanceof SchemaPath) { + String name = getPath((SchemaPath) nameRef); + return new Not(new EqualTo(structType.column(name), expression)); + } + return null; + } + } + return null; + } + + @Override + public Expression visitFloatConstant(ValueExpressions.FloatExpression fExpr, Void value) { + return Literal.of(fExpr.getFloat()); + } + + @Override + public Expression visitIntConstant(ValueExpressions.IntExpression intExpr, Void value) { + return Literal.of(intExpr.getInt()); + } + + @Override + public Expression visitLongConstant(ValueExpressions.LongExpression longExpr, Void value) { + return Literal.of(longExpr.getLong()); + } + + @Override + public Expression visitDecimal9Constant(ValueExpressions.Decimal9Expression decExpr, Void value) { + return Literal.of(decExpr.getIntFromDecimal()); + } + + @Override + public Expression visitDecimal18Constant(ValueExpressions.Decimal18Expression decExpr, Void value) { + return Literal.of(decExpr.getLongFromDecimal()); + } + + @Override + public Expression visitDecimal28Constant(ValueExpressions.Decimal28Expression decExpr, Void value) { + return Literal.of(decExpr.getBigDecimal()); + } + + @Override + public Expression visitDecimal38Constant(ValueExpressions.Decimal38Expression decExpr, Void value) { + return Literal.of(decExpr.getBigDecimal()); + } + + @Override + public Expression visitVarDecimalConstant(ValueExpressions.VarDecimalExpression decExpr, Void value) { + return Literal.of(decExpr.getBigDecimal()); + } + + @Override + public Expression visitDateConstant(ValueExpressions.DateExpression dateExpr, Void value) { + return Literal.of(dateExpr.getDate()); + } + + @Override + public Expression visitTimeConstant(ValueExpressions.TimeExpression timeExpr, Void value) { + return Literal.of(timeExpr.getTime()); + } + + @Override + public Expression visitTimeStampConstant(ValueExpressions.TimeStampExpression timestampExpr, Void value) { + return Literal.of(timestampExpr.getTimeStamp()); + } + + @Override + public Expression visitDoubleConstant(ValueExpressions.DoubleExpression dExpr, Void value) { + return Literal.of(dExpr.getDouble()); + } + + @Override + public Expression visitBooleanConstant(ValueExpressions.BooleanExpression e, Void value) { + return Literal.of(e.getBoolean()); + } + + @Override + public Expression visitQuotedStringConstant(ValueExpressions.QuotedString e, Void value) { + return Literal.of(e.getString()); + } + + @Override + public Expression visitUnknown(LogicalExpression e, Void value) { + return null; + } + + private static String getPath(SchemaPath schemaPath) { Review Comment: Is any of this generic enough to go to a schema utils class, or is the "element" keyword specific to Delta Lake? ########## contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/format/DeltaFormatMatcher.java: ########## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.delta.format; + +import io.delta.standalone.DeltaLog; +import org.apache.drill.exec.planner.logical.DrillTable; +import org.apache.drill.exec.store.SchemaConfig; +import org.apache.drill.exec.store.dfs.DrillFileSystem; +import org.apache.drill.exec.store.dfs.FileSelection; +import org.apache.drill.exec.store.dfs.FileSystemPlugin; +import org.apache.drill.exec.store.dfs.FormatMatcher; +import org.apache.drill.exec.store.dfs.FormatPlugin; +import org.apache.drill.exec.store.dfs.FormatSelection; +import org.apache.drill.exec.store.plan.rel.PluginDrillTable; +import org.apache.hadoop.fs.FileStatus; + +public class DeltaFormatMatcher extends FormatMatcher { + + private final DeltaFormatPlugin formatPlugin; + + public DeltaFormatMatcher(DeltaFormatPlugin formatPlugin) { + this.formatPlugin = formatPlugin; + } + + @Override + public boolean supportDirectoryReads() { Review Comment: Is this something that was introduced by the Iceberg format plugin? ########## exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/PluginImplementor.java: ########## @@ -93,4 +93,12 @@ default void visitChild(RelNode input) throws IOException { * to ensure returning the correct rows number. */ boolean artificialLimit(); + + /** + * If the plugin doesn't support native filter pushdown, + * but the reader can prune the number of rows to read. + * In this case filter operator on top of the scan should be preserved + * to ensure returning the correct rows number. Review Comment: ```suggestion * but the reader can prune the set of rows to read. * In this case filter operator on top of the scan should be preserved * to ensure returning the correct subset of rows. ``` > Format plugin for Delta Lake > ---------------------------- > > Key: DRILL-8353 > URL: https://issues.apache.org/jira/browse/DRILL-8353 > Project: Apache Drill > Issue Type: New Feature > Affects Versions: 1.20.2 > Reporter: Vova Vysotskyi > Assignee: Vova Vysotskyi > Priority: Major > Fix For: Future > > > Implement format plugin for Delta Lake. -- This message was sent by Atlassian Jira (v8.20.10#820010)