singhpk234 commented on code in PR #9830:
URL: https://github.com/apache/iceberg/pull/9830#discussion_r1578549234
##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java:
##########
@@ -557,6 +568,88 @@ public View loadView(Identifier ident) throws
NoSuchViewException {
throw new NoSuchViewException(ident);
}
+ // Candidate to be moved to org.apache.iceberg.view.View
+ private boolean isMaterializedView(org.apache.iceberg.view.View view) {
+ return
Optional.of(view.properties().get(MaterializedViewUtil.MATERIALIZED_VIEW_PROPERTY_KEY))
+ .orElse("false")
+ .equals("true");
+ }
+
+ // Candidate to be moved to org.apache.iceberg.view.View
+ private String getStorageTableIdentifier(org.apache.iceberg.view.View view) {
+ String identifier =
+
view.properties().get(MaterializedViewUtil.MATERIALIZED_VIEW_STORAGE_TABLE_PROPERTY_KEY);
+ Preconditions.checkState(
+ identifier != null, "Storage table identifier is not set for
materialized view.");
+ return identifier;
+ }
+
+ // Candidate to be moved to org.apache.iceberg.view.View but requires
loadTable
+ private Table loadStorageTable(org.apache.iceberg.view.View view) {
+ String storageTableIdentifier = getStorageTableIdentifier(view);
+ try {
+ SparkSession session = SparkSession.active();
+ Table storageTable =
+ loadTable(Spark3Util.catalogAndIdentifier(session,
storageTableIdentifier).identifier());
+ return storageTable;
+ } catch (ParseException | NoSuchTableException e) {
+ throw new IllegalStateException("Unable to load storage table for
materialized view.", e);
+ }
+ }
+
+ // Candidate to be moved to org.apache.iceberg.view.View but requires
loadTable
+ // Second option is to move to SparkMaterializedView
+ private boolean isFresh(org.apache.iceberg.view.View view) {
+ Table storageTable = loadStorageTable(view);
+ Map<String, String> storageTableProperties = storageTable.properties();
+
+ // Get the parent view version id from the storage table properties
+ String storageTableViewVersionIdPropertyValue =
+ storageTableProperties.get(
+ MaterializedViewUtil.MATERIALIZED_VIEW_VERSION_PROPERTY_KEY);
+ if (storageTableViewVersionIdPropertyValue == null) {
+ throw new IllegalStateException(
+ "Storage table properties do not contain the virtual view version id
property.");
+ }
+ int storageTableViewVersionId =
Integer.parseInt(storageTableViewVersionIdPropertyValue);
+
+ // If the storage table view version id is different from the current
version id, the
+ // materialized view is not fresh
+ if (storageTableViewVersionId != view.currentVersion().versionId()) {
+ return false;
+ }
+
+ // Get the base table snapshot ids from the storage table properties
+ Map<String, String> baseTableSnapshotsProperties =
+ storageTableProperties.entrySet().stream()
+ .filter(
+ entry ->
+ entry
+ .getKey()
+ .startsWith(
+ MaterializedViewUtil
+
.MATERIALIZED_VIEW_BASE_SNAPSHOT_PROPERTY_KEY_PREFIX))
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ List<Table> baseTables =
MaterializedViewUtil.extractBaseTables(view.sqlFor("spark").sql());
+
+ for (Table baseTable : baseTables) {
+ org.apache.iceberg.Table icebergBaseTable = ((SparkTable)
baseTable).table();
+ String snapshotId =
+ String.valueOf(
+ icebergBaseTable.currentSnapshot() == null
+ ? 0
+ : icebergBaseTable.currentSnapshot().snapshotId());
+ if (!baseTableSnapshotsProperties
Review Comment:
was wondering if it's not too much, does it makes sense to have freshness
check, configurable ? This would kinda give view creator more control on the
freshness check and avoid un-necessary re-loading only when the current
snapshot id changed ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]