urosstan-db commented on code in PR #50921:
URL: https://github.com/apache/spark/pull/50921#discussion_r2126593202


##########
common/utils/src/main/resources/error/error-conditions.json:
##########
@@ -9403,6 +9403,11 @@
       "The number of fields (<numFields>) in the partition identifier is not 
equal to the partition schema length (<schemaLen>). The identifier might not 
refer to one partition."
     ]
   },
+  "UNSUPPORTED_JOIN_TYPES": {
+    "message" : [
+      "Unsupported join type: <joinType>"
+    ]
+  },

Review Comment:
   We should be more precise here, 
UNSUPPORTED_JOIN_TYPE_IN_CONNECTOR_EXPRESSION, something like that.
   Also, it would be great if you can add what are supported join types?



##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/JoinTypeSQLBuilder.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.util;
+
+import org.apache.spark.SparkUnsupportedOperationException;
+import org.apache.spark.sql.connector.join.*;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The builder to generate SQL for specific Join type.
+ *
+ * @since 4.0.0
+ */
+public class JoinTypeSQLBuilder {

Review Comment:
   Do we need this class, can we use compileJoin method without this class?



##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/join/Inner.java:
##########
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.join;
+
+import org.apache.spark.annotation.Evolving;
+
+@Evolving

Review Comment:
   You can use @Developer tag as well if you prefer it more, that means class 
is for internal usage and noone should rely on it ever.



##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/join/JoinColumn.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.join;
+
+import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.connector.expressions.Expression;
+import org.apache.spark.sql.connector.expressions.NamedReference;
+
+/**
+ * Represents a column reference used in DSv2 Join pushdown.
+ *
+ * @since 4.0.0
+ */
+@Evolving
+public final class JoinColumn implements NamedReference {

Review Comment:
   Java record seems like better fit here



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala:
##########
@@ -189,6 +189,12 @@ case class RowDataSourceScanExec(
       seqToString(markedFilters.toSeq)
     }
 
+    val pushedJoins = if (pushedDownOperators.pushedJoins.nonEmpty) {
+      Map("PushedJoins" -> seqToString(pushedDownOperators.pushedJoins))
+    } else {
+      Map()
+    }

Review Comment:
   It's maybe fine to display this unconditionally, it is easier for 
debuggability, but I am fine with this approach as well



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala:
##########
@@ -501,6 +503,13 @@ object DataSourceStrategy
     }
   }
 
+  def translateJoinType(joinType: JoinType): Option[V2JoinType] = {
+    joinType match {
+      case Inner => Some(new V2Inner)
+      case _ => None
+    }
+  }

Review Comment:
   I know there are bunch of methods for pushdowns here, but I am not sure 
whether it's good approach, is there some better place for this one?



##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsPushDownJoin.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.read;
+
+import java.util.Optional;
+
+import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.connector.expressions.filter.Predicate;
+import org.apache.spark.sql.connector.join.JoinType;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A mix-in interface for {@link ScanBuilder}. Data sources can implement this 
interface to
+ * push down join operators.
+ *
+ * @since 4.0.0
+ */
+@Evolving
+public interface SupportsPushDownJoin extends ScanBuilder {
+  boolean isRightSideCompatibleForJoin(SupportsPushDownJoin other);
+
+  boolean pushJoin(
+    SupportsPushDownJoin other,
+    JoinType joinType,
+    Optional<Predicate> condition,
+    StructType leftRequiredSchema,
+    StructType rightRequiredSchema
+    );
+
+  // When join is pushed down, the output schema can be changed in case of 
duplicate column names.

Review Comment:
   We should define java docs here to describe what is output of method, 
comments are not visible without source code (compiler JAR, etc).



##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/join/Inner.java:
##########
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.join;
+
+import org.apache.spark.annotation.Evolving;
+
+@Evolving
+public final class Inner implements JoinType { }

Review Comment:
   It would be good to be more specific here, `Inner` is too generic name
   ```suggestion
   public final class InnerJoinType implements JoinType { }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to