[ https://issues.apache.org/jira/browse/DRILL-8353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17636136#comment-17636136 ]
ASF GitHub Bot commented on DRILL-8353: --------------------------------------- vvysotskyi commented on code in PR #2702: URL: https://github.com/apache/drill/pull/2702#discussion_r1027065550 ########## contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/plan/DrillExprToDeltaTranslator.java: ########## @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.delta.plan; + +import io.delta.standalone.expressions.And; +import io.delta.standalone.expressions.EqualTo; +import io.delta.standalone.expressions.Expression; +import io.delta.standalone.expressions.GreaterThan; +import io.delta.standalone.expressions.GreaterThanOrEqual; +import io.delta.standalone.expressions.IsNotNull; +import io.delta.standalone.expressions.IsNull; +import io.delta.standalone.expressions.LessThan; +import io.delta.standalone.expressions.LessThanOrEqual; +import io.delta.standalone.expressions.Literal; +import io.delta.standalone.expressions.Not; +import io.delta.standalone.expressions.Or; +import io.delta.standalone.expressions.Predicate; +import io.delta.standalone.types.StructType; +import org.apache.drill.common.FunctionNames; +import org.apache.drill.common.expression.FunctionCall; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.expression.PathSegment; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.expression.ValueExpressions; +import org.apache.drill.common.expression.visitors.AbstractExprVisitor; + +public class DrillExprToDeltaTranslator extends AbstractExprVisitor<Expression, Void, RuntimeException> { + + private final StructType structType; + + public DrillExprToDeltaTranslator(StructType structType) { + this.structType = structType; + } + + @Override + public Expression visitFunctionCall(FunctionCall call, Void value) { + try { + return visitFunctionCall(call); + } catch (Exception e) { + return null; + } + } + + private Predicate visitFunctionCall(FunctionCall call) { + switch (call.getName()) { + case FunctionNames.AND: { + Expression left = call.arg(0).accept(this, null); + Expression right = call.arg(1).accept(this, null); + if (left != null && right != null) { + return new And(left, right); + } + return null; + } + case FunctionNames.OR: { + Expression left = call.arg(0).accept(this, null); + Expression right = call.arg(1).accept(this, null); + if (left != null && right != null) { + return new Or(left, right); + } + return null; + } + case FunctionNames.NOT: { + Expression expression = call.arg(0).accept(this, null); + if (expression != null) { + return new Not(expression); + } + return null; + } + case FunctionNames.IS_NULL: { + LogicalExpression arg = call.arg(0); + if (arg instanceof SchemaPath) { + String name = getPath((SchemaPath) arg); + return new IsNull(structType.column(name)); + } + return null; + } + case FunctionNames.IS_NOT_NULL: { + LogicalExpression arg = call.arg(0); + if (arg instanceof SchemaPath) { + String name = getPath((SchemaPath) arg); + return new IsNotNull(structType.column(name)); + } + return null; + } + case FunctionNames.LT: { + LogicalExpression nameRef = call.arg(0); + Expression expression = call.arg(1).accept(this, null); + if (nameRef instanceof SchemaPath) { + String name = getPath((SchemaPath) nameRef); + return new LessThan(structType.column(name), expression); + } + return null; + } + case FunctionNames.LE: { + LogicalExpression nameRef = call.arg(0); + Expression expression = call.arg(1).accept(this, null); + if (nameRef instanceof SchemaPath) { + String name = getPath((SchemaPath) nameRef); + return new LessThanOrEqual(structType.column(name), expression); + } + return null; + } + case FunctionNames.GT: { + LogicalExpression nameRef = call.args().get(0); + Expression expression = call.args().get(1).accept(this, null); + if (nameRef instanceof SchemaPath) { + String name = getPath((SchemaPath) nameRef); + return new GreaterThan(structType.column(name), expression); + } + return null; + } + case FunctionNames.GE: { + LogicalExpression nameRef = call.args().get(0); + Expression expression = call.args().get(0).accept(this, null); + if (nameRef instanceof SchemaPath) { + String name = getPath((SchemaPath) nameRef); + return new GreaterThanOrEqual(structType.column(name), expression); + } + return null; + } + case FunctionNames.EQ: { + LogicalExpression nameRef = call.args().get(0); + Expression expression = call.args().get(1).accept(this, null); + if (nameRef instanceof SchemaPath) { + String name = getPath((SchemaPath) nameRef); + return new EqualTo(structType.column(name), expression); + } + return null; + } + case FunctionNames.NE: { + LogicalExpression nameRef = call.args().get(0); + Expression expression = call.args().get(1).accept(this, null); + if (nameRef instanceof SchemaPath) { + String name = getPath((SchemaPath) nameRef); + return new Not(new EqualTo(structType.column(name), expression)); + } + return null; + } + } + return null; + } + + @Override + public Expression visitFloatConstant(ValueExpressions.FloatExpression fExpr, Void value) { + return Literal.of(fExpr.getFloat()); + } + + @Override + public Expression visitIntConstant(ValueExpressions.IntExpression intExpr, Void value) { + return Literal.of(intExpr.getInt()); + } + + @Override + public Expression visitLongConstant(ValueExpressions.LongExpression longExpr, Void value) { + return Literal.of(longExpr.getLong()); + } + + @Override + public Expression visitDecimal9Constant(ValueExpressions.Decimal9Expression decExpr, Void value) { + return Literal.of(decExpr.getIntFromDecimal()); + } + + @Override + public Expression visitDecimal18Constant(ValueExpressions.Decimal18Expression decExpr, Void value) { + return Literal.of(decExpr.getLongFromDecimal()); + } + + @Override + public Expression visitDecimal28Constant(ValueExpressions.Decimal28Expression decExpr, Void value) { + return Literal.of(decExpr.getBigDecimal()); + } + + @Override + public Expression visitDecimal38Constant(ValueExpressions.Decimal38Expression decExpr, Void value) { + return Literal.of(decExpr.getBigDecimal()); + } + + @Override + public Expression visitVarDecimalConstant(ValueExpressions.VarDecimalExpression decExpr, Void value) { + return Literal.of(decExpr.getBigDecimal()); + } + + @Override + public Expression visitDateConstant(ValueExpressions.DateExpression dateExpr, Void value) { + return Literal.of(dateExpr.getDate()); + } + + @Override + public Expression visitTimeConstant(ValueExpressions.TimeExpression timeExpr, Void value) { + return Literal.of(timeExpr.getTime()); + } + + @Override + public Expression visitTimeStampConstant(ValueExpressions.TimeStampExpression timestampExpr, Void value) { + return Literal.of(timestampExpr.getTimeStamp()); + } + + @Override + public Expression visitDoubleConstant(ValueExpressions.DoubleExpression dExpr, Void value) { + return Literal.of(dExpr.getDouble()); + } + + @Override + public Expression visitBooleanConstant(ValueExpressions.BooleanExpression e, Void value) { + return Literal.of(e.getBoolean()); + } + + @Override + public Expression visitQuotedStringConstant(ValueExpressions.QuotedString e, Void value) { + return Literal.of(e.getString()); + } + + @Override + public Expression visitUnknown(LogicalExpression e, Void value) { + return null; + } + + private static String getPath(SchemaPath schemaPath) { Review Comment: Yes, I think `element` keyword usage is not common. ########## contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/format/DeltaFormatPlugin.java: ########## @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.delta.format; + +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.RelOptRule; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.metastore.MetadataProviderManager; +import org.apache.drill.exec.physical.base.AbstractGroupScan; +import org.apache.drill.exec.physical.base.AbstractWriter; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.planner.PlannerPhase; +import org.apache.drill.exec.planner.common.DrillStatsTable; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.exec.record.metadata.schema.SchemaProvider; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.store.PluginRulesProviderImpl; +import org.apache.drill.exec.store.StoragePluginRulesSupplier; +import org.apache.drill.exec.store.dfs.FileSelection; +import org.apache.drill.exec.store.dfs.FileSystemConfig; +import org.apache.drill.exec.store.dfs.FormatMatcher; +import org.apache.drill.exec.store.dfs.FormatPlugin; +import org.apache.drill.exec.store.delta.DeltaGroupScan; +import org.apache.drill.exec.store.delta.plan.DeltaPluginImplementor; +import org.apache.drill.exec.store.parquet.ParquetReaderConfig; +import org.apache.drill.exec.store.plan.rel.PluginRel; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +public class DeltaFormatPlugin implements FormatPlugin { + + private static final String DELTA_CONVENTION_PREFIX = "DELTA."; + + /** + * Generator for format id values. Formats with the same name may be defined + * in multiple storage plugins, so using the unique id within the convention name + * to ensure the rule names will be unique for different plugin instances. + */ + private static final AtomicInteger NEXT_ID = new AtomicInteger(0); + + private final FileSystemConfig storageConfig; + + private final DeltaFormatPluginConfig config; + + private final Configuration fsConf; + + private final DrillbitContext context; + + private final String name; + + private final DeltaFormatMatcher matcher; + + private final StoragePluginRulesSupplier storagePluginRulesSupplier; + + public DeltaFormatPlugin( + String name, + DrillbitContext context, + Configuration fsConf, + FileSystemConfig storageConfig, + DeltaFormatPluginConfig config) { + this.storageConfig = storageConfig; + this.config = config; + this.fsConf = fsConf; + this.context = context; + this.name = name; + this.matcher = new DeltaFormatMatcher(this); + this.storagePluginRulesSupplier = storagePluginRulesSupplier(name + NEXT_ID.getAndIncrement()); + } + + private static StoragePluginRulesSupplier storagePluginRulesSupplier(String name) { + Convention convention = new Convention.Impl(DELTA_CONVENTION_PREFIX + name, PluginRel.class); + return StoragePluginRulesSupplier.builder() + .rulesProvider(new PluginRulesProviderImpl(convention, DeltaPluginImplementor::new)) + .supportsFilterPushdown(true) + .supportsProjectPushdown(true) + .supportsLimitPushdown(true) + .convention(convention) + .build(); + } + + @Override + public boolean supportsRead() { + return true; + } + + @Override + public boolean supportsWrite() { + return false; + } + + @Override + public boolean supportsAutoPartitioning() { + return false; + } + + @Override + public FormatMatcher getMatcher() { + return matcher; + } + + @Override + public AbstractWriter getWriter(PhysicalOperator child, String location, List<String> partitionColumns) { + throw new UnsupportedOperationException(); + } + + @Override + public Set<? extends RelOptRule> getOptimizerRules(PlannerPhase phase) { + switch (phase) { + case PHYSICAL: + case LOGICAL: + return storagePluginRulesSupplier.getOptimizerRules(); + case LOGICAL_PRUNE_AND_JOIN: + case LOGICAL_PRUNE: + case PARTITION_PRUNING: + case JOIN_PLANNING: + default: + return Collections.emptySet(); + } + } + + @Override + public AbstractGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns) throws IOException { + return DeltaGroupScan.builder() + .userName(userName) + .formatPlugin(this) + .readerConfig(ParquetReaderConfig.builder().withConf(fsConf).build()) + .path(selection.selectionRoot.toUri().getPath()) + .columns(columns) + .limit(-1) + .build(); + } + + @Override + public AbstractGroupScan getGroupScan(String userName, FileSelection selection, + List<SchemaPath> columns, MetadataProviderManager metadataProviderManager) throws IOException { + SchemaProvider schemaProvider = metadataProviderManager.getSchemaProvider(); + TupleMetadata schema = schemaProvider != null + ? schemaProvider.read().getSchema() + : null; + return DeltaGroupScan.builder() + .userName(userName) + .formatPlugin(this) + .readerConfig(ParquetReaderConfig.builder().withConf(fsConf).build()) + .schema(schema) + .path(selection.selectionRoot.toUri().getPath()) + .columns(columns) + .limit(-1) + .build(); + } + + @Override + public boolean supportsStatistics() { + return false; + } + + @Override + public DrillStatsTable.TableStatistics readStatistics(FileSystem fs, Path statsTablePath) { + throw new UnsupportedOperationException("unimplemented"); Review Comment: It uses stats files that Drill creates (currently, only parquet format is supported), so for the case of the Delta plugin, I'm not sure whether it is a good idea to create extra files there. ########## contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/read/DeltaScanBatchCreator.java: ########## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.delta.read; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.ops.ExecutorFragmentContext; +import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.physical.impl.BatchCreator; +import org.apache.drill.exec.record.CloseableRecordBatch; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.server.options.OptionManager; +import org.apache.drill.exec.store.ColumnExplorer; +import org.apache.drill.exec.store.delta.DeltaRowGroupScan; +import org.apache.drill.exec.store.dfs.DrillFileSystem; +import org.apache.drill.exec.store.parquet.AbstractParquetRowGroupScan; +import org.apache.drill.exec.store.parquet.AbstractParquetScanBatchCreator; +import org.apache.drill.exec.store.parquet.ParquetScanBatchCreator; +import org.apache.drill.exec.store.parquet.RowGroupReadEntry; +import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; + +import java.util.List; +import java.util.Map; + +@SuppressWarnings("unused") +public class DeltaScanBatchCreator extends AbstractParquetScanBatchCreator Review Comment: Yes, this and other options are applicable to the delta plugin. ########## contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/format/DeltaFormatMatcher.java: ########## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.delta.format; + +import io.delta.standalone.DeltaLog; +import org.apache.drill.exec.planner.logical.DrillTable; +import org.apache.drill.exec.store.SchemaConfig; +import org.apache.drill.exec.store.dfs.DrillFileSystem; +import org.apache.drill.exec.store.dfs.FileSelection; +import org.apache.drill.exec.store.dfs.FileSystemPlugin; +import org.apache.drill.exec.store.dfs.FormatMatcher; +import org.apache.drill.exec.store.dfs.FormatPlugin; +import org.apache.drill.exec.store.dfs.FormatSelection; +import org.apache.drill.exec.store.plan.rel.PluginDrillTable; +import org.apache.hadoop.fs.FileStatus; + +public class DeltaFormatMatcher extends FormatMatcher { + + private final DeltaFormatPlugin formatPlugin; + + public DeltaFormatMatcher(DeltaFormatPlugin formatPlugin) { + this.formatPlugin = formatPlugin; + } + + @Override + public boolean supportDirectoryReads() { Review Comment: No, it was before. It means that the Drill plugin is able to handle queries where the directory is specified, not the file. > Format plugin for Delta Lake > ---------------------------- > > Key: DRILL-8353 > URL: https://issues.apache.org/jira/browse/DRILL-8353 > Project: Apache Drill > Issue Type: New Feature > Affects Versions: 1.20.2 > Reporter: Vova Vysotskyi > Assignee: Vova Vysotskyi > Priority: Major > Fix For: Future > > > Implement format plugin for Delta Lake. -- This message was sent by Atlassian Jira (v8.20.10#820010)