This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new f671053dfa Spark 3.3: Add months transform function (#6261)
f671053dfa is described below
commit f671053dfa8960eb85ae8531e56daf9ee2fe7fac
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Sat Nov 26 13:31:54 2022 -0800
Spark 3.3: Add months transform function (#6261)
---
.../iceberg/spark/functions/MonthsFunction.java | 118 +++++++++++++++++++++
.../iceberg/spark/functions/SparkFunctions.java | 1 +
.../iceberg/spark/sql/TestSparkMonthsFunction.java | 115 ++++++++++++++++++++
3 files changed, 234 insertions(+)
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/MonthsFunction.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/MonthsFunction.java
new file mode 100644
index 0000000000..c073c048a5
--- /dev/null
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/MonthsFunction.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.functions;
+
+import org.apache.iceberg.util.DateTimeUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.functions.BoundFunction;
+import org.apache.spark.sql.connector.catalog.functions.ScalarFunction;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.DateType;
+import org.apache.spark.sql.types.TimestampType;
+
+/**
+ * A Spark function implementation for the Iceberg month transform.
+ *
+ * <p>Example usage: {@code SELECT system.months('source_col')}.
+ */
+public class MonthsFunction extends UnaryUnboundFunction {
+
+ @Override
+ protected BoundFunction doBind(DataType valueType) {
+ if (valueType instanceof DateType) {
+ return new DateToMonthsFunction();
+ } else if (valueType instanceof TimestampType) {
+ return new TimestampToMonthsFunction();
+ } else {
+ throw new UnsupportedOperationException(
+ "Expected value to be date or timestamp: " +
valueType.catalogString());
+ }
+ }
+
+ @Override
+ public String description() {
+ return name()
+ + "(col) - Call Iceberg's month transform\n"
+ + " col :: source column (must be date or timestamp)";
+ }
+
+ @Override
+ public String name() {
+ return "months";
+ }
+
+ private abstract static class BaseToMonthsFunction implements
ScalarFunction<Integer> {
+ @Override
+ public String name() {
+ return "months";
+ }
+
+ @Override
+ public DataType resultType() {
+ return DataTypes.IntegerType;
+ }
+ }
+
+ public static class DateToMonthsFunction extends BaseToMonthsFunction {
+ // magic method used in codegen
+ public static int invoke(int days) {
+ return DateTimeUtil.daysToMonths(days);
+ }
+
+ @Override
+ public DataType[] inputTypes() {
+ return new DataType[] {DataTypes.DateType};
+ }
+
+ @Override
+ public String canonicalName() {
+ return "iceberg.months(date)";
+ }
+
+ @Override
+ public Integer produceResult(InternalRow input) {
+ // return null for null input to match what Spark does in codegen
+ return input.isNullAt(0) ? null : invoke(input.getInt(0));
+ }
+ }
+
+ public static class TimestampToMonthsFunction extends BaseToMonthsFunction {
+ // magic method used in codegen
+ public static int invoke(long micros) {
+ return DateTimeUtil.microsToMonths(micros);
+ }
+
+ @Override
+ public DataType[] inputTypes() {
+ return new DataType[] {DataTypes.TimestampType};
+ }
+
+ @Override
+ public String canonicalName() {
+ return "iceberg.months(timestamp)";
+ }
+
+ @Override
+ public Integer produceResult(InternalRow input) {
+ // return null for null input to match what Spark does in codegen
+ return input.isNullAt(0) ? null : invoke(input.getLong(0));
+ }
+ }
+}
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/SparkFunctions.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/SparkFunctions.java
index aa71a02a22..db05a8c313 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/SparkFunctions.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/SparkFunctions.java
@@ -33,6 +33,7 @@ public class SparkFunctions {
ImmutableMap.of(
"iceberg_version", new IcebergVersionFunction(),
"years", new YearsFunction(),
+ "months", new MonthsFunction(),
"bucket", new BucketFunction(),
"truncate", new TruncateFunction());
diff --git
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkMonthsFunction.java
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkMonthsFunction.java
new file mode 100644
index 0000000000..b88bf00256
--- /dev/null
+++
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkMonthsFunction.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.sql;
+
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.spark.SparkTestBaseWithCatalog;
+import org.apache.iceberg.spark.functions.MonthsFunction;
+import org.apache.spark.sql.AnalysisException;
+import org.assertj.core.api.Assertions;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestSparkMonthsFunction extends SparkTestBaseWithCatalog {
+
+ @Before
+ public void useCatalog() {
+ sql("USE %s", catalogName);
+ }
+
+ @Test
+ public void testDates() {
+ Assert.assertEquals(
+ "Expected to produce 47 * 12 + 11 = 575",
+ 575,
+ scalarSql("SELECT system.months(date('2017-12-01'))"));
+ Assert.assertEquals(
+ "Expected to produce 0 * 12 + 0 = 0",
+ 0,
+ scalarSql("SELECT system.months(date('1970-01-01'))"));
+ Assert.assertEquals(
+ "Expected to produce -1", -1, scalarSql("SELECT
system.months(date('1969-12-31'))"));
+ Assert.assertNull(scalarSql("SELECT system.months(CAST(null AS DATE))"));
+ }
+
+ @Test
+ public void testTimestamps() {
+ Assert.assertEquals(
+ "Expected to produce 47 * 12 + 11 = 575",
+ 575,
+ scalarSql("SELECT system.months(TIMESTAMP '2017-12-01 10:12:55.038194
UTC+00:00')"));
+ Assert.assertEquals(
+ "Expected to produce 0 * 12 + 0 = 0",
+ 0,
+ scalarSql("SELECT system.months(TIMESTAMP '1970-01-01 00:00:01.000001
UTC+00:00')"));
+ Assert.assertEquals(
+ "Expected to produce -1",
+ -1,
+ scalarSql("SELECT system.months(TIMESTAMP '1969-12-31 23:59:58.999999
UTC+00:00')"));
+ Assert.assertNull(scalarSql("SELECT system.months(CAST(null AS
TIMESTAMP))"));
+ }
+
+ @Test
+ public void testWrongNumberOfArguments() {
+ AssertHelpers.assertThrows(
+ "Function resolution should not work with zero arguments",
+ AnalysisException.class,
+ "Function 'months' cannot process input: (): Wrong number of inputs",
+ () -> scalarSql("SELECT system.months()"));
+
+ AssertHelpers.assertThrows(
+ "Function resolution should not work with more than one argument",
+ AnalysisException.class,
+ "Function 'months' cannot process input: (date, date): Wrong number of
inputs",
+ () -> scalarSql("SELECT system.months(date('1969-12-31'),
date('1969-12-31'))"));
+ }
+
+ @Test
+ public void testInvalidInputTypes() {
+ AssertHelpers.assertThrows(
+ "Int type should not be coercible to date/timestamp",
+ AnalysisException.class,
+ "Function 'months' cannot process input: (int): Expected value to be
date or timestamp",
+ () -> scalarSql("SELECT system.months(1)"));
+
+ AssertHelpers.assertThrows(
+ "Long type should not be coercible to date/timestamp",
+ AnalysisException.class,
+ "Function 'months' cannot process input: (bigint): Expected value to
be date or timestamp",
+ () -> scalarSql("SELECT system.months(1L)"));
+ }
+
+ @Test
+ public void testThatMagicFunctionsAreInvoked() {
+ String dateValue = "date('2017-12-01')";
+ String dateTransformClass =
MonthsFunction.DateToMonthsFunction.class.getName();
+ Assertions.assertThat(scalarSql("EXPLAIN EXTENDED SELECT
system.months(%s)", dateValue))
+ .asString()
+ .isNotNull()
+ .contains("staticinvoke(class " + dateTransformClass);
+
+ String timestampValue = "TIMESTAMP '2017-12-01 10:12:55.038194 UTC+00:00'";
+ String timestampTransformClass =
MonthsFunction.TimestampToMonthsFunction.class.getName();
+ Assertions.assertThat(scalarSql("EXPLAIN EXTENDED SELECT
system.months(%s)", timestampValue))
+ .asString()
+ .isNotNull()
+ .contains("staticinvoke(class " + timestampTransformClass);
+ }
+}