arina-ielchiieva commented on a change in pull request #1501: DRILL-6791: Scan projection framework URL: https://github.com/apache/drill/pull/1501#discussion_r233458212
########## File path: exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaSmoothing.java ########## @@ -0,0 +1,681 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.scan.project; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.List; + +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.physical.impl.protocol.SchemaTracker; +import org.apache.drill.exec.physical.impl.scan.ScanTestUtils; +import org.apache.drill.exec.physical.impl.scan.project.NullColumnBuilder; +import org.apache.drill.exec.physical.impl.scan.project.ResolvedColumn; +import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedRow; +import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection; +import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator; +import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator.ReaderSchemaOrchestrator; +import org.apache.drill.exec.physical.impl.scan.project.SchemaSmoother; +import org.apache.drill.exec.physical.impl.scan.project.SchemaSmoother.IncompatibleSchemaException; +import org.apache.drill.exec.physical.impl.scan.project.SmoothingProjection; +import org.apache.drill.exec.physical.impl.scan.project.WildcardSchemaProjection; +import org.apache.drill.exec.physical.rowSet.ResultSetLoader; +import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.test.SubOperatorTest; +import org.apache.drill.test.rowSet.RowSet.SingleRowSet; +import org.apache.drill.test.rowSet.RowSetComparison; +import org.apache.drill.test.rowSet.schema.SchemaBuilder; +import org.junit.Test; + +/** + * Tests schema smoothing at the schema projection level. + * This level handles reusing prior types when filling null + * values. But, because no actual vectors are involved, it + * does not handle the schema chosen for a table ahead of + * time, only the schema as it is merged with prior schema to + * detect missing columns. + * <p> + * Focuses on the <tt>SmoothingProjection</tt> class itself. + * <p> + * Note that, at present, schema smoothing does not work for entire + * maps. That is, if file 1 has, say <tt>{a: {b: 10, c: "foo"}}</tt> + * and file 2 has, say, <tt>{a: null}</tt>, then schema smoothing does + * not currently know how to recreate the map. The same is true of + * lists and unions. Handling such cases is complex and is probably + * better handled via a system that allows the user to specify their + * intent by providing a schema to apply to the two files. + */ + +public class TestSchemaSmoothing extends SubOperatorTest { + + /** + * Low-level test of the smoothing projection, including the exceptions + * it throws when things are not going its way. + */ + + @Test + public void testSmoothingProjection() { + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectAll(), + ScanTestUtils.parsers()); + + // Table 1: (a: nullable bigint, b) + + final TupleMetadata schema1 = new SchemaBuilder() + .addNullable("a", MinorType.BIGINT) + .addNullable("b", MinorType.VARCHAR) + .add("c", MinorType.FLOAT8) + .buildSchema(); + ResolvedRow priorSchema; + { + final NullColumnBuilder builder = new NullColumnBuilder(null, false); + final ResolvedRow rootTuple = new ResolvedRow(builder); + new WildcardSchemaProjection( + scanProj, schema1, rootTuple, + ScanTestUtils.resolvers()); + priorSchema = rootTuple; + } + + // Table 2: (a: nullable bigint, c), column omitted, original schema preserved + + final TupleMetadata schema2 = new SchemaBuilder() + .addNullable("a", MinorType.BIGINT) + .add("c", MinorType.FLOAT8) + .buildSchema(); + try { + final NullColumnBuilder builder = new NullColumnBuilder(null, false); + final ResolvedRow rootTuple = new ResolvedRow(builder); + new SmoothingProjection( + scanProj, schema2, priorSchema, rootTuple, + ScanTestUtils.resolvers()); + assertTrue(schema1.isEquivalent(ScanTestUtils.schema(rootTuple))); + priorSchema = rootTuple; + } catch (final IncompatibleSchemaException e) { + fail(); + } + + // Table 3: (a, c, d), column added, must replan schema + + final TupleMetadata schema3 = new SchemaBuilder() + .addNullable("a", MinorType.BIGINT) + .addNullable("b", MinorType.VARCHAR) + .add("c", MinorType.FLOAT8) + .add("d", MinorType.INT) + .buildSchema(); + try { + final NullColumnBuilder builder = new NullColumnBuilder(null, false); + final ResolvedRow rootTuple = new ResolvedRow(builder); + new SmoothingProjection( + scanProj, schema3, priorSchema, rootTuple, + ScanTestUtils.resolvers()); + fail(); + } catch (final IncompatibleSchemaException e) { + // Expected + } + + // Table 4: (a: double), change type must replan schema + + final TupleMetadata schema4 = new SchemaBuilder() + .addNullable("a", MinorType.FLOAT8) + .addNullable("b", MinorType.VARCHAR) + .add("c", MinorType.FLOAT8) + .buildSchema(); + try { + final NullColumnBuilder builder = new NullColumnBuilder(null, false); + final ResolvedRow rootTuple = new ResolvedRow(builder); + new SmoothingProjection( + scanProj, schema4, priorSchema, rootTuple, + ScanTestUtils.resolvers()); + fail(); + } catch (final IncompatibleSchemaException e) { + // Expected + } + + // Table 5: Drop a non-nullable column, must replan + + final TupleMetadata schema6 = new SchemaBuilder() + .addNullable("a", MinorType.BIGINT) + .addNullable("b", MinorType.VARCHAR) + .buildSchema(); + try { + final NullColumnBuilder builder = new NullColumnBuilder(null, false); + final ResolvedRow rootTuple = new ResolvedRow(builder); + new SmoothingProjection( + scanProj, schema6, priorSchema, rootTuple, + ScanTestUtils.resolvers()); + fail(); + } catch (final IncompatibleSchemaException e) { + // Expected + } + } + + /** + * Case in which the table schema is a superset of the prior + * schema. Discard prior schema. Turn off auto expansion of + * metadata for a simpler test. + */ + + @Test + public void testSmaller() { + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectAll(), + ScanTestUtils.parsers()); + + final SchemaSmoother smoother = new SchemaSmoother(scanProj, + ScanTestUtils.resolvers()); + + final TupleMetadata priorSchema = new SchemaBuilder() + .add("a", MinorType.INT) + .buildSchema(); + final TupleMetadata tableSchema = new SchemaBuilder() + .add("a", MinorType.INT) + .add("b", MinorType.VARCHAR) + .buildSchema(); + + { + final NullColumnBuilder builder = new NullColumnBuilder(null, false); + final ResolvedRow rootTuple = new ResolvedRow(builder); + smoother.resolve(priorSchema, rootTuple); + assertEquals(1, smoother.schemaVersion()); + assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(priorSchema)); + } + { + final NullColumnBuilder builder = new NullColumnBuilder(null, false); + final ResolvedRow rootTuple = new ResolvedRow(builder); + smoother.resolve(tableSchema, rootTuple); + assertEquals(2, smoother.schemaVersion()); + assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(tableSchema)); + } + } + + /** + * Case in which the table schema and prior are disjoint + * sets. Discard the prior schema. + */ + + @Test + public void testDisjoint() { + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectAll(), + ScanTestUtils.parsers()); + + final SchemaSmoother smoother = new SchemaSmoother(scanProj, + ScanTestUtils.resolvers()); + + final TupleMetadata priorSchema = new SchemaBuilder() + .add("a", MinorType.INT) + .buildSchema(); + final TupleMetadata tableSchema = new SchemaBuilder() + .add("b", MinorType.VARCHAR) + .buildSchema(); + + { + doResolve(smoother, priorSchema); + } + { + final ResolvedRow rootTuple = doResolve(smoother, tableSchema); + assertEquals(2, smoother.schemaVersion()); + assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(tableSchema)); + } + } + + private ResolvedRow doResolve(SchemaSmoother smoother, TupleMetadata schema) { + final NullColumnBuilder builder = new NullColumnBuilder(null, false); + final ResolvedRow rootTuple = new ResolvedRow(builder); + smoother.resolve(schema, rootTuple); + return rootTuple; + } + + /** + * Column names match, but types differ. Discard the prior schema. + */ + + @Test + public void testDifferentTypes() { + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectAll(), + ScanTestUtils.parsers()); + + final SchemaSmoother smoother = new SchemaSmoother(scanProj, + ScanTestUtils.resolvers()); + + final TupleMetadata priorSchema = new SchemaBuilder() + .add("a", MinorType.INT) + .add("b", MinorType.VARCHAR) + .buildSchema(); + final TupleMetadata tableSchema = new SchemaBuilder() + .add("a", MinorType.INT) + .addNullable("b", MinorType.VARCHAR) + .buildSchema(); + + { + doResolve(smoother, priorSchema); + } + { + final ResolvedRow rootTuple = doResolve(smoother, tableSchema); + assertEquals(2, smoother.schemaVersion()); + assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(tableSchema)); + } + } + + /** + * The prior and table schemas are identical. Preserve the prior + * schema (though, the output is no different than if we discarded + * the prior schema...) + */ + + @Test + public void testSameSchemas() { + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectAll(), + ScanTestUtils.parsers()); + + final SchemaSmoother smoother = new SchemaSmoother(scanProj, + ScanTestUtils.resolvers()); + + final TupleMetadata priorSchema = new SchemaBuilder() + .add("a", MinorType.INT) + .add("b", MinorType.VARCHAR) + .buildSchema(); + final TupleMetadata tableSchema = new SchemaBuilder() + .add("a", MinorType.INT) + .add("b", MinorType.VARCHAR) + .buildSchema(); + + { + doResolve(smoother, priorSchema); + } + { + final ResolvedRow rootTuple = doResolve(smoother, tableSchema); + assertEquals(1, smoother.schemaVersion()); + final TupleMetadata actualSchema = ScanTestUtils.schema(rootTuple); + assertTrue(actualSchema.isEquivalent(tableSchema)); + assertTrue(actualSchema.isEquivalent(priorSchema)); + } + } + + /** + * The prior and table schemas are identical, but the cases of names differ. + * Preserve the case of the first schema. + */ + + @Test + public void testDifferentCase() { + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectAll(), + ScanTestUtils.parsers()); + + final SchemaSmoother smoother = new SchemaSmoother(scanProj, + ScanTestUtils.resolvers()); + + final TupleMetadata priorSchema = new SchemaBuilder() + .add("a", MinorType.INT) + .add("b", MinorType.VARCHAR) + .buildSchema(); + final TupleMetadata tableSchema = new SchemaBuilder() + .add("A", MinorType.INT) + .add("B", MinorType.VARCHAR) + .buildSchema(); + + { + doResolve(smoother, priorSchema); + } + { + final ResolvedRow rootTuple = doResolve(smoother, tableSchema); + assertEquals(1, smoother.schemaVersion()); + assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(priorSchema)); + final List<ResolvedColumn> columns = rootTuple.columns(); + assertEquals("a", columns.get(0).name()); + } + } + + /** + * Can't preserve the prior schema if it had required columns + * where the new schema has no columns. + */ + + @Test + public void testRequired() { + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectAll(), + ScanTestUtils.parsers()); + + final SchemaSmoother smoother = new SchemaSmoother(scanProj, + ScanTestUtils.resolvers()); + + final TupleMetadata priorSchema = new SchemaBuilder() + .add("a", MinorType.INT) + .addNullable("b", MinorType.VARCHAR) + .buildSchema(); + final TupleMetadata tableSchema = new SchemaBuilder() + .addNullable("b", MinorType.VARCHAR) + .buildSchema(); + + { + doResolve(smoother, priorSchema); + } + { + final ResolvedRow rootTuple = doResolve(smoother, tableSchema); + assertEquals(2, smoother.schemaVersion()); + assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(tableSchema)); + } + } + + /** + * Preserve the prior schema if table is a subset and missing columns + * are nullable or repeated. + */ + + @Test + public void testMissingNullableColumns() { + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectAll(), + ScanTestUtils.parsers()); + + final SchemaSmoother smoother = new SchemaSmoother(scanProj, + ScanTestUtils.resolvers()); + + final TupleMetadata priorSchema = new SchemaBuilder() + .addNullable("a", MinorType.INT) + .add("b", MinorType.VARCHAR) + .addArray("c", MinorType.BIGINT) + .buildSchema(); + final TupleMetadata tableSchema = new SchemaBuilder() + .add("b", MinorType.VARCHAR) + .buildSchema(); + + { + doResolve(smoother, priorSchema); + } + { + final ResolvedRow rootTuple = doResolve(smoother, tableSchema); + assertEquals(1, smoother.schemaVersion()); + assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(priorSchema)); + } + } + + /** + * Preserve the prior schema if table is a subset. Map the table + * columns to the output using the prior schema ordering. + */ + + @Test + public void testReordering() { + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectAll(), + ScanTestUtils.parsers()); + + final SchemaSmoother smoother = new SchemaSmoother(scanProj, + ScanTestUtils.resolvers()); + + final TupleMetadata priorSchema = new SchemaBuilder() + .addNullable("a", MinorType.INT) + .add("b", MinorType.VARCHAR) + .addArray("c", MinorType.BIGINT) + .buildSchema(); + final TupleMetadata tableSchema = new SchemaBuilder() + .add("b", MinorType.VARCHAR) + .addNullable("a", MinorType.INT) + .buildSchema(); + + { + doResolve(smoother, priorSchema); + } + { + final ResolvedRow rootTuple = doResolve(smoother, tableSchema); + assertEquals(1, smoother.schemaVersion()); + assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(priorSchema)); + } + } + /** + * Integrated test across multiple schemas at the batch level. + */ + + @Test + public void testSmoothableSchemaBatches() { + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectAll(), + ScanTestUtils.parsers()); + + final SchemaSmoother smoother = new SchemaSmoother(scanProj, + ScanTestUtils.resolvers()); + + // Table 1: (a: bigint, b) + + final TupleMetadata schema1 = new SchemaBuilder() + .addNullable("a", MinorType.BIGINT) + .addNullable("b", MinorType.VARCHAR) + .add("c", MinorType.FLOAT8) + .buildSchema(); + { + final ResolvedRow rootTuple = doResolve(smoother, schema1); + + // Just use the original schema. + + assertTrue(schema1.isEquivalent(ScanTestUtils.schema(rootTuple))); + assertEquals(1, smoother.schemaVersion()); + } + + // Table 2: (a: nullable bigint, c), column ommitted, original schema preserved + + final TupleMetadata schema2 = new SchemaBuilder() + .addNullable("a", MinorType.BIGINT) + .add("c", MinorType.FLOAT8) + .buildSchema(); + { + final ResolvedRow rootTuple = doResolve(smoother, schema2); + assertTrue(schema1.isEquivalent(ScanTestUtils.schema(rootTuple))); + assertEquals(1, smoother.schemaVersion()); + } + + // Table 3: (a, c, d), column added, must replan schema + + final TupleMetadata schema3 = new SchemaBuilder() + .addNullable("a", MinorType.BIGINT) + .addNullable("b", MinorType.VARCHAR) + .add("c", MinorType.FLOAT8) + .add("d", MinorType.INT) + .buildSchema(); + { + final ResolvedRow rootTuple = doResolve(smoother, schema3); + assertTrue(schema3.isEquivalent(ScanTestUtils.schema(rootTuple))); + assertEquals(2, smoother.schemaVersion()); + } + + // Table 4: Drop a non-nullable column, must replan + + final TupleMetadata schema4 = new SchemaBuilder() + .addNullable("a", MinorType.BIGINT) + .addNullable("b", MinorType.VARCHAR) + .buildSchema(); + { + final ResolvedRow rootTuple = doResolve(smoother, schema4); + assertTrue(schema4.isEquivalent(ScanTestUtils.schema(rootTuple))); + assertEquals(3, smoother.schemaVersion()); + } + + // Table 5: (a: double), change type must replan schema + + final TupleMetadata schema5 = new SchemaBuilder() + .addNullable("a", MinorType.FLOAT8) + .addNullable("b", MinorType.VARCHAR) + .buildSchema(); + { + final ResolvedRow rootTuple = doResolve(smoother, schema5); + assertTrue(schema5.isEquivalent(ScanTestUtils.schema(rootTuple))); + assertEquals(4, smoother.schemaVersion()); + } + +// // Table 6: (a: not-nullable bigint): convert to nullable for consistency Review comment: Please clean up. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
