Here are my notes from this week’s DSv2 sync. *Attendees*:
Ryan Blue Holden Karau Russell Spitzer Terry Kim Wenchen Fan Shiv Prashant Sood Joseph Torres Gengliang Wang Matt Cheah Burak Yavuz *Topics*: - Driver-side Hadoop conf - SHOW DATABASES/NAMESPACES behavior - Review outstanding 3.0 work - Spark 2.5? - Open PRs - Centralize catalog and table lookup: https://github.com/apache/spark/pull/25747 - Update TableProvider for catalogs: https://github.com/apache/spark/pull/25651 - Partitioning in DataFrameWriter.save (multiple PRs) - USE CATALOG / SET CURRENT CATALOG: https://github.com/apache/spark/pull/25771 - UPDATE: https://github.com/apache/spark/pull/25626 *Discussion*: - Driver-side Hadoop conf - Holden: DSv1 propagates Hadoop conf, but using it is a pain in DSv2. This leads to multiple instances. Would be nice for DSv2 to pass it. - Ryan: Iceberg uses it and serializes it in every task; it would be nice to avoid this. - Wenchen: how would we support this? - Holden: DSv1 generates it on the driver, could use a mix-in - Ryan: part of the complication is that it is produced per stage with the current Spark config - Matt: There is a SerializableConfiguration in Iceberg, too. Overhead per task is fine, but we could use a broadcast to avoid it. - Holden will create a PR to expose SerializableConfiguration and one to add a way to pass the config to the source - SHOW NAMESPACES vs SHOW DATABASES - Ryan: These two are separate code paths right now. We chose to make SHOW NAMESPACES compatible with SHOW DATABASES, so we can either choose a different behavior or combine the two implementations - Wenchen: we should merge the two implementations - Terry will submit a PR to merge the implementations - *Outstanding work before 3.0* - Ryan: we are about ready for 3.0, but we should make sure to focus on the remaining things that need to be done. I’d like to get an idea of what needs to be done by 3.0 and what is nice-to-have - Wenchen: Dynamic partition push-down is in 3.0, will review to see if DSv2 is compatible - Ryan: It is a little late to make this a 3.0 goal. DSv2 works well without this and I think this could be added in 3.1 - Consensus was that *dynamic push-down is nice-to-have and should not block the release* - Ryan: is REFRESH TABLE needed? File sources needed this but it was never implemented - Wenchen: refresh should not block 3.0, file sources with v2 is not a 3.0 goal - Consensus was *not to block on adding refresh (nice to have)* - *Final list*: - Finish TableProvider update to avoid another API change: pass all table config from metastore (Wenchen) - Catalog behavior fix: https://issues.apache.org/jira/browse/SPARK-29014 (Ryan) - Stats push-down fix (Ryan) - Make DataFrameWriter compatible when updating a source from v1 to v2 (Burak) - Spark 2.5 release? - Ryan: We are very close to feature complete for a release, but 3.0 is still a few months away. I’m going to be backporting DSv2 onto Spark 2.4 and I could contribute this work for a 2.5 release. I have had lots of people asking when DSv2 will be available and since it isn’t a breaking change we don’t need to wait until 3.0. - Wenchen: it would be good to have a 2.x release with the same DSv2 support as 3.0 so source authors only need to support one DSv2 API. There are substantial changes to it since 2.4. - Ryan: Good point about compatibility, that would make using DSv2 much easier. - Holden: It would also be helpful to support Java 11 for the same reason. - Ryan: I think it makes sense to do a compatibility release in preparation for 3.0 then. I’ll bring this up on the dev list. - DataFrameWriterV2 python API: - Ryan: I opened SPARK-29157 to add support for the new DataFrameWriterV2 API in python. This is probably a good starter issue if anyone wants to work on it. - Holden would like to work on this - Centralize catalog and table lookup: https://github.com/apache/spark/pull/25747 - Wenchen: This refactor cleans up the default/current catalog and also separates catalog resolution from table resolution - Ryan: One of my main concerns is not having conversion in rules, but in the ParsedStatement plans themselves - Wenchen: The PR has been updated, that is no longer the case and it uses extractors. - Ryan: Will take another look, then. Also, I’d like to keep the rules in Analyzer specific to DSv2 and keep the v1 fallback rules in DataSourceResolution. That way fallback is a special case and we can remove it without rewriting the v2 rules. - Wenchen: That would also allow us to keep the v2 rules in catalyst instead of in sql-core, will make the change - Partitioning in DataFrameWriter.save: - Burak: DataFrameWriter.save uses a different default SaveMode, which changes behavior when an implementation moves from v1 to v2. The idea behind these PRs is to allow a TableProvider implementation to provide create/exists/drop. - Ryan: This line of reasoning is why we introduced the TableCatalog API: to implement CTAS, we need to create a table; but to make this reliable we need to know whether the table already exists; then to handle failure, we need to be able to drop the table if the write fails. The progression of PRs mirrors this: the first adds a create and exists, the second adds drop. - Ryan: I think it is helpful to re-frame the problem. The issue is that a read/write implementation loads a specific class, which doesn’t have an associated a catalog because there was only one catalog when this was designed. The read/write implementation class alone can’t determine the right catalog to use. To go from a read/write implementation to a catalog, we have 3 options: - Don’t allow operations that require a catalog, i.e. no CTAS or ErrorIfExists mode. This is what we currently do, but moving from v1 to v2 means the default changes to append, or fails because ErrorIfExists isn’t supported - Always use spark_catalog because that’s what has always been used before - Allow the source to extract the catalog from the DataFrameReader/DataFrameWriter options. - Matt: I like the third option, where the implementation can extract a catalog from the read/write options. - Burak: That option would allow using all of the same DSv2 plans: use the options to get a catalog and a table identifier - Consensus was to go with the *third option: add extractCatalogName(Options) and extractIdentifier(Options)* - There was also some discussion about path-based tables, but these are out of scope because behavior is not yet defined - Ryan: Going with option 3 still requires Wenchen’s changes to TableProvider to pass all the table information. The DataFrame API will get a catalog and identifier pair, but instantiating a Table instance in a generic metastore still requires the buildTable method. - TableProvider PR: https://github.com/apache/spark/pull/25651 - Wenchen: should this use multiple methods or a single method with Optional args? - Ryan: No strong opinion - Ryan: Implementing REFRESH TABLE for v2 is another good starter project -- Ryan Blue Software Engineer Netflix