arina-ielchiieva commented on a change in pull request #1711: DRILL-7011:
Support schema in scan framework
URL: https://github.com/apache/drill/pull/1711#discussion_r268604275
##########
File path:
exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithSchema.java
##########
@@ -82,6 +167,468 @@ public void testSchema() throws Exception {
.addRow(10, new LocalDate(2019, 3, 20), "it works!", 1234.5D, 20L,
"")
.build();
RowSetUtilities.verify(expected, actual);
+ } finally {
+ resetV3();
+ resetSchema();
+ }
+ }
+
+
+ /**
+ * Use a schema with explicit projection to get a consistent view
+ * of the table schema, even if columns are missing, rows are ragged,
+ * and column order changes.
+ * <p>
+ * Force the scans to occur in distinct fragments so the order of the
+ * file batches is random.
+ */
+ @Test
+ public void testMultiFileSchema() throws Exception {
+ RowSet expected1 = null;
+ RowSet expected2 = null;
+ try {
+ enableV3(true);
+ enableSchema(true);
+ enableMultiScan();
+ String tablePath = buildTwoFileTable("multiFileSchema",
raggedMulti1Contents, reordered2Contents);
+ run(SCHEMA_SQL, tablePath);
+
+ // Wildcard expands to union of schema + table. In this case
+ // all table columns appear in the schema (though not all schema
+ // columns appear in the table.)
+
+ String sql = "SELECT id, `name`, `date`, gender, comment FROM " +
tablePath;
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("id", MinorType.INT)
+ .add("name", MinorType.VARCHAR)
+ .addNullable("date", MinorType.DATE)
+ .add("gender", MinorType.VARCHAR)
+ .add("comment", MinorType.VARCHAR)
+ .buildSchema();
+ expected1 = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow(1, "arina", new LocalDate(2019, 1, 18), "female", "ABC")
+ .addRow(2, "javan", new LocalDate(2019, 1, 19), "male", "ABC")
+ .addRow(4, "albert", new LocalDate(2019, 5, 4), "", "ABC")
+ .build();
+ expected2 = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow(3, "bob", new LocalDate(2001, 1, 16), "NA", "ABC")
+ .build();
+
+ // Loop 10 times so that, as the two reader fragments read the two
+ // files, we end up with (acceptable) races that read the files in
+ // random order.
+
+ for (int i = 0; i < 10; i++) {
+ boolean sawSchema = false;
+ boolean sawFile1 = false;
+ boolean sawFile2 = false;
+ Iterator<DirectRowSet> iter =
client.queryBuilder().sql(sql).rowSetIterator();
+ while (iter.hasNext()) {
+ RowSet result = iter.next();
+ if (result.rowCount() == 3) {
+ sawFile1 = true;
+ new RowSetComparison(expected1).verifyAndClear(result);
+ } else if (result.rowCount() == 1) {
+ sawFile2 = true;
+ new RowSetComparison(expected2).verifyAndClear(result);
+ } else {
+ assertEquals(0, result.rowCount());
+ sawSchema = true;
+ }
+ }
+ assertTrue(sawSchema);
+ assertTrue(sawFile1);
+ assertTrue(sawFile2);
+ }
+ } finally {
+ expected1.clear();
+ expected2.clear();
+ client.resetSession(ExecConstants.ENABLE_V3_TEXT_READER_KEY);
+ client.resetSession(ExecConstants.STORE_TABLE_USE_SCHEMA_FILE);
+ client.resetSession(ExecConstants.MIN_READER_WIDTH_KEY);
+ }
+ }
+
+ /**
+ * Test the schema we get in V2 when the table read order is random.
+ * Worst-case: the two files have different column counts and
+ * column orders.
+ * <p>
+ * Though the results are random, we iterate 10 times which, in most runs,
+ * shows the random variation in schemas:
+ * <ul>
+ * <li>Sometimes the first batch has three columns, sometimes four.</li>
+ * <li>Sometimes the column `id` is in position 0, sometimes in position 1
+ * (correlated with the above).</li>
+ * <li>Due to the fact that sometimes the first file (with four columns)
+ * is returned first, sometimes the second file (with three columns) is
+ * returned first.</li>
+ * </ul>
+ */
+ @Test
+ public void testSchemaRaceV2() throws Exception {
+ try {
+ enableV3(false);
+ enableSchema(false);
+ enableMultiScan();
+ String tablePath = buildTwoFileTable("schemaRaceV2", multi1Contents,
reordered2Contents);
+ boolean sawFile1First = false;
+ boolean sawFile2First = false;
+ boolean sawFullSchema = false;
+ boolean sawPartialSchema = false;
+ boolean sawIdAsCol0 = false;
+ boolean sawIdAsCol1 = false;
+ String sql = "SELECT * FROM " + tablePath;
+ for (int i = 0; i < 10; i++) {
+ Iterator<DirectRowSet> iter =
client.queryBuilder().sql(sql).rowSetIterator();
+ int batchCount = 0;
+ while(iter.hasNext()) {
+ batchCount++;
+ RowSet result = iter.next();
+ TupleMetadata resultSchema = result.schema();
+ if (resultSchema.size() == 4) {
+ sawFullSchema = true;
+ } else {
+ assertEquals(3, resultSchema.size());
+ sawPartialSchema = true;
+ }
+ if (resultSchema.index("id") == 0) {
+ sawIdAsCol0 = true;
+ } else {
+ assertEquals(1, resultSchema.index("id"));
+ sawIdAsCol1 = true;
+ }
+ if (batchCount == 1) {
+ RowSetReader reader = result.reader();
+ assertTrue(reader.next());
+ String id = reader.scalar("id").getString();
+ if (id.equals("1")) {
+ sawFile1First = true;
+ } else {
+ assertEquals("3", id);
+ sawFile2First = true;
+ }
+ }
+ result.clear();
+ }
+ }
+
+ // Outcome is random (which is the key problem). Don't assert on these
+ // because doing so can lead to a flakey test.
+
+ if (!sawFile1First || ! sawFile2First || !sawFullSchema ||
!sawPartialSchema || !sawIdAsCol0 || !sawIdAsCol1) {
+ System.out.println("Some variations did not occur");
+ System.out.println(String.format("File 1 first: %s", sawFile1First));
+ System.out.println(String.format("File 1 second: %s", sawFile2First));
+ System.out.println(String.format("Full schema: %s", sawFullSchema));
+ System.out.println(String.format("Partial schema: %s",
sawPartialSchema));
+ System.out.println(String.format("`id` as col 0: %s", sawIdAsCol0));
+ System.out.println(String.format("`id` as col 1: %s", sawIdAsCol1));
+ }
+ // Sanity checks
+ assertTrue(sawFullSchema);
+ assertTrue(sawFile1First || sawFile2First);
+ assertTrue(sawIdAsCol0 || sawIdAsCol1);
+ } finally {
+ resetV3();
+ resetSchema();
+ resetMultiScan();
+ }
+ }
+
+ /**
+ * Show that, without schema, the hard schema change for the "missing"
+ * gender column causes an error in the sort operator when presented with
+ * one batch in which gender is VARCHAR, another in which it is
+ * Nullable INT. This is a consequence of using SELECT * on a distributed
+ * scan.
+ */
+ @Test
+ public void testWildcardSortFailure() throws Exception {
+ try {
+ enableSchema(false);
+ enableMultiScan();
+ enableV3(false);
+ String tablePath = buildTwoFileTable("wildcardSortV2", multi1Contents,
reordered2Contents);
+ doTestWildcardSortFailure(tablePath);
+ enableV3(true);
+ doTestWildcardSortFailure(tablePath);
+ } finally {
+ resetV3();
+ resetSchema();
+ resetMultiScan();
+ }
+ }
+
+ private void doTestWildcardSortFailure(String tablePath) throws Exception {
+ String sql = "SELECT * FROM " + tablePath + " ORDER BY id";
+ boolean sawError = false;
+ for (int i = 0; i < 10; i++) {
+ try {
+ // When this fails it will print a nasty stack trace.
+ RowSet result = client.queryBuilder().sql(sql).rowSet();
+ assertEquals(4, result.rowCount());
+ result.clear();
+ } catch (RpcException e) {
+ assertTrue(e.getCause() instanceof UserRemoteException);
+ sawError = true;
+ break;
+ }
+ }
+ assertTrue(sawError);
+ }
+
+ /**
+ * Test an explicit projection with a sort. Using the sort 1) will blow up
+ * if the internal schema is inconsistent, and 2) allows easier verification
+ * of the merged table results.
+ * <p>
+ * Fails with <code><pre>
+ * #: id, name, gender
+ * 0: "1", "bob