Repository: spark Updated Branches: refs/heads/master a2f4cdceb -> cb2d2e158
[SPARK-9758] [TEST] [SQL] Compilation issue for hive test / wrong package? Move `test.org.apache.spark.sql.hive` package tests to apparent intended `org.apache.spark.sql.hive` as they don't intend to test behavior from outside org.apache.spark.* Alternate take, per discussion at https://github.com/apache/spark/pull/8051 I think this is what vanzin and I had in mind but also CC rxin to cross-check, as this does indeed depend on whether these tests were accidentally in this package or not. Testing from a `test.org.apache.spark` package is legitimate but didn't seem to be the intent here. Author: Sean Owen <so...@cloudera.com> Closes #8307 from srowen/SPARK-9758. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cb2d2e15 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cb2d2e15 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cb2d2e15 Branch: refs/heads/master Commit: cb2d2e15844d7ae34b5dd7028b55e11586ed93fa Parents: a2f4cdc Author: Sean Owen <so...@cloudera.com> Authored: Mon Aug 24 22:35:21 2015 +0100 Committer: Sean Owen <so...@cloudera.com> Committed: Mon Aug 24 22:35:21 2015 +0100 ---------------------------------------------------------------------- .../spark/sql/hive/JavaDataFrameSuite.java | 104 ++++++++++++ .../sql/hive/JavaMetastoreDataSourcesSuite.java | 162 ++++++++++++++++++ .../spark/sql/hive/aggregate/MyDoubleAvg.java | 129 +++++++++++++++ .../spark/sql/hive/aggregate/MyDoubleSum.java | 118 ++++++++++++++ .../sql/hive/execution/UDFIntegerToString.java | 26 +++ .../sql/hive/execution/UDFListListInt.java | 47 ++++++ .../spark/sql/hive/execution/UDFListString.java | 38 +++++ .../sql/hive/execution/UDFStringString.java | 26 +++ .../sql/hive/execution/UDFTwoListList.java | 28 ++++ .../spark/sql/hive/JavaDataFrameSuite.java | 106 ------------ .../sql/hive/JavaMetastoreDataSourcesSuite.java | 163 ------------------- .../spark/sql/hive/aggregate/MyDoubleAvg.java | 129 --------------- .../spark/sql/hive/aggregate/MyDoubleSum.java | 118 -------------- .../sql/hive/execution/UDFIntegerToString.java | 26 --- .../sql/hive/execution/UDFListListInt.java | 47 ------ .../spark/sql/hive/execution/UDFListString.java | 38 ----- .../sql/hive/execution/UDFStringString.java | 26 --- .../sql/hive/execution/UDFTwoListList.java | 28 ---- .../hive/execution/AggregationQuerySuite.scala | 2 +- 19 files changed, 679 insertions(+), 682 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/cb2d2e15/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java new file mode 100644 index 0000000..019d8a3 --- /dev/null +++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.*; +import org.apache.spark.sql.expressions.Window; +import org.apache.spark.sql.expressions.UserDefinedAggregateFunction; +import static org.apache.spark.sql.functions.*; +import org.apache.spark.sql.hive.test.TestHive$; +import org.apache.spark.sql.hive.aggregate.MyDoubleSum; + +public class JavaDataFrameSuite { + private transient JavaSparkContext sc; + private transient HiveContext hc; + + DataFrame df; + + private void checkAnswer(DataFrame actual, List<Row> expected) { + String errorMessage = QueryTest$.MODULE$.checkAnswer(actual, expected); + if (errorMessage != null) { + Assert.fail(errorMessage); + } + } + + @Before + public void setUp() throws IOException { + hc = TestHive$.MODULE$; + sc = new JavaSparkContext(hc.sparkContext()); + + List<String> jsonObjects = new ArrayList<String>(10); + for (int i = 0; i < 10; i++) { + jsonObjects.add("{\"key\":" + i + ", \"value\":\"str" + i + "\"}"); + } + df = hc.read().json(sc.parallelize(jsonObjects)); + df.registerTempTable("window_table"); + } + + @After + public void tearDown() throws IOException { + // Clean up tables. + if (hc != null) { + hc.sql("DROP TABLE IF EXISTS window_table"); + } + } + + @Test + public void saveTableAndQueryIt() { + checkAnswer( + df.select(functions.avg("key").over( + Window.partitionBy("value").orderBy("key").rowsBetween(-1, 1))), + hc.sql("SELECT avg(key) " + + "OVER (PARTITION BY value " + + " ORDER BY key " + + " ROWS BETWEEN 1 preceding and 1 following) " + + "FROM window_table").collectAsList()); + } + + @Test + public void testUDAF() { + DataFrame df = hc.range(0, 100).unionAll(hc.range(0, 100)).select(col("id").as("value")); + UserDefinedAggregateFunction udaf = new MyDoubleSum(); + UserDefinedAggregateFunction registeredUDAF = hc.udf().register("mydoublesum", udaf); + // Create Columns for the UDAF. For now, callUDF does not take an argument to specific if + // we want to use distinct aggregation. + DataFrame aggregatedDF = + df.groupBy() + .agg( + udaf.distinct(col("value")), + udaf.apply(col("value")), + registeredUDAF.apply(col("value")), + callUDF("mydoublesum", col("value"))); + + List<Row> expectedResult = new ArrayList<Row>(); + expectedResult.add(RowFactory.create(4950.0, 9900.0, 9900.0, 9900.0)); + checkAnswer( + aggregatedDF, + expectedResult); + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/cb2d2e15/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java new file mode 100644 index 0000000..4192155 --- /dev/null +++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.spark.sql.SaveMode; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.DataFrame; +import org.apache.spark.sql.QueryTest$; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.hive.test.TestHive$; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.util.Utils; + +public class JavaMetastoreDataSourcesSuite { + private transient JavaSparkContext sc; + private transient HiveContext sqlContext; + + String originalDefaultSource; + File path; + Path hiveManagedPath; + FileSystem fs; + DataFrame df; + + private void checkAnswer(DataFrame actual, List<Row> expected) { + String errorMessage = QueryTest$.MODULE$.checkAnswer(actual, expected); + if (errorMessage != null) { + Assert.fail(errorMessage); + } + } + + @Before + public void setUp() throws IOException { + sqlContext = TestHive$.MODULE$; + sc = new JavaSparkContext(sqlContext.sparkContext()); + + originalDefaultSource = sqlContext.conf().defaultDataSourceName(); + path = + Utils.createTempDir(System.getProperty("java.io.tmpdir"), "datasource").getCanonicalFile(); + if (path.exists()) { + path.delete(); + } + hiveManagedPath = new Path(sqlContext.catalog().hiveDefaultTableFilePath("javaSavedTable")); + fs = hiveManagedPath.getFileSystem(sc.hadoopConfiguration()); + if (fs.exists(hiveManagedPath)){ + fs.delete(hiveManagedPath, true); + } + + List<String> jsonObjects = new ArrayList<String>(10); + for (int i = 0; i < 10; i++) { + jsonObjects.add("{\"a\":" + i + ", \"b\":\"str" + i + "\"}"); + } + JavaRDD<String> rdd = sc.parallelize(jsonObjects); + df = sqlContext.read().json(rdd); + df.registerTempTable("jsonTable"); + } + + @After + public void tearDown() throws IOException { + // Clean up tables. + if (sqlContext != null) { + sqlContext.sql("DROP TABLE IF EXISTS javaSavedTable"); + sqlContext.sql("DROP TABLE IF EXISTS externalTable"); + } + } + + @Test + public void saveExternalTableAndQueryIt() { + Map<String, String> options = new HashMap<String, String>(); + options.put("path", path.toString()); + df.write() + .format("org.apache.spark.sql.json") + .mode(SaveMode.Append) + .options(options) + .saveAsTable("javaSavedTable"); + + checkAnswer( + sqlContext.sql("SELECT * FROM javaSavedTable"), + df.collectAsList()); + + DataFrame loadedDF = + sqlContext.createExternalTable("externalTable", "org.apache.spark.sql.json", options); + + checkAnswer(loadedDF, df.collectAsList()); + checkAnswer( + sqlContext.sql("SELECT * FROM externalTable"), + df.collectAsList()); + } + + @Test + public void saveExternalTableWithSchemaAndQueryIt() { + Map<String, String> options = new HashMap<String, String>(); + options.put("path", path.toString()); + df.write() + .format("org.apache.spark.sql.json") + .mode(SaveMode.Append) + .options(options) + .saveAsTable("javaSavedTable"); + + checkAnswer( + sqlContext.sql("SELECT * FROM javaSavedTable"), + df.collectAsList()); + + List<StructField> fields = new ArrayList<StructField>(); + fields.add(DataTypes.createStructField("b", DataTypes.StringType, true)); + StructType schema = DataTypes.createStructType(fields); + DataFrame loadedDF = + sqlContext.createExternalTable("externalTable", "org.apache.spark.sql.json", schema, options); + + checkAnswer( + loadedDF, + sqlContext.sql("SELECT b FROM javaSavedTable").collectAsList()); + checkAnswer( + sqlContext.sql("SELECT * FROM externalTable"), + sqlContext.sql("SELECT b FROM javaSavedTable").collectAsList()); + } + + @Test + public void saveTableAndQueryIt() { + Map<String, String> options = new HashMap<String, String>(); + df.write() + .format("org.apache.spark.sql.json") + .mode(SaveMode.Append) + .options(options) + .saveAsTable("javaSavedTable"); + + checkAnswer( + sqlContext.sql("SELECT * FROM javaSavedTable"), + df.collectAsList()); + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/cb2d2e15/sql/hive/src/test/java/org/apache/spark/sql/hive/aggregate/MyDoubleAvg.java ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/aggregate/MyDoubleAvg.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/aggregate/MyDoubleAvg.java new file mode 100644 index 0000000..5a167ed --- /dev/null +++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/aggregate/MyDoubleAvg.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.aggregate; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.spark.sql.Row; +import org.apache.spark.sql.expressions.MutableAggregationBuffer; +import org.apache.spark.sql.expressions.UserDefinedAggregateFunction; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; + +/** + * An example {@link UserDefinedAggregateFunction} to calculate a special average value of a + * {@link org.apache.spark.sql.types.DoubleType} column. This special average value is the sum + * of the average value of input values and 100.0. + */ +public class MyDoubleAvg extends UserDefinedAggregateFunction { + + private StructType _inputDataType; + + private StructType _bufferSchema; + + private DataType _returnDataType; + + public MyDoubleAvg() { + List<StructField> inputFields = new ArrayList<StructField>(); + inputFields.add(DataTypes.createStructField("inputDouble", DataTypes.DoubleType, true)); + _inputDataType = DataTypes.createStructType(inputFields); + + // The buffer has two values, bufferSum for storing the current sum and + // bufferCount for storing the number of non-null input values that have been contribuetd + // to the current sum. + List<StructField> bufferFields = new ArrayList<StructField>(); + bufferFields.add(DataTypes.createStructField("bufferSum", DataTypes.DoubleType, true)); + bufferFields.add(DataTypes.createStructField("bufferCount", DataTypes.LongType, true)); + _bufferSchema = DataTypes.createStructType(bufferFields); + + _returnDataType = DataTypes.DoubleType; + } + + @Override public StructType inputSchema() { + return _inputDataType; + } + + @Override public StructType bufferSchema() { + return _bufferSchema; + } + + @Override public DataType dataType() { + return _returnDataType; + } + + @Override public boolean deterministic() { + return true; + } + + @Override public void initialize(MutableAggregationBuffer buffer) { + // The initial value of the sum is null. + buffer.update(0, null); + // The initial value of the count is 0. + buffer.update(1, 0L); + } + + @Override public void update(MutableAggregationBuffer buffer, Row input) { + // This input Row only has a single column storing the input value in Double. + // We only update the buffer when the input value is not null. + if (!input.isNullAt(0)) { + // If the buffer value (the intermediate result of the sum) is still null, + // we set the input value to the buffer and set the bufferCount to 1. + if (buffer.isNullAt(0)) { + buffer.update(0, input.getDouble(0)); + buffer.update(1, 1L); + } else { + // Otherwise, update the bufferSum and increment bufferCount. + Double newValue = input.getDouble(0) + buffer.getDouble(0); + buffer.update(0, newValue); + buffer.update(1, buffer.getLong(1) + 1L); + } + } + } + + @Override public void merge(MutableAggregationBuffer buffer1, Row buffer2) { + // buffer1 and buffer2 have the same structure. + // We only update the buffer1 when the input buffer2's sum value is not null. + if (!buffer2.isNullAt(0)) { + if (buffer1.isNullAt(0)) { + // If the buffer value (intermediate result of the sum) is still null, + // we set the it as the input buffer's value. + buffer1.update(0, buffer2.getDouble(0)); + buffer1.update(1, buffer2.getLong(1)); + } else { + // Otherwise, we update the bufferSum and bufferCount. + Double newValue = buffer2.getDouble(0) + buffer1.getDouble(0); + buffer1.update(0, newValue); + buffer1.update(1, buffer1.getLong(1) + buffer2.getLong(1)); + } + } + } + + @Override public Object evaluate(Row buffer) { + if (buffer.isNullAt(0)) { + // If the bufferSum is still null, we return null because this function has not got + // any input row. + return null; + } else { + // Otherwise, we calculate the special average value. + return buffer.getDouble(0) / buffer.getLong(1) + 100.0; + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/cb2d2e15/sql/hive/src/test/java/org/apache/spark/sql/hive/aggregate/MyDoubleSum.java ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/aggregate/MyDoubleSum.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/aggregate/MyDoubleSum.java new file mode 100644 index 0000000..c3b7768 --- /dev/null +++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/aggregate/MyDoubleSum.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.aggregate; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.spark.sql.expressions.MutableAggregationBuffer; +import org.apache.spark.sql.expressions.UserDefinedAggregateFunction; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.Row; + +/** + * An example {@link UserDefinedAggregateFunction} to calculate the sum of a + * {@link org.apache.spark.sql.types.DoubleType} column. + */ +public class MyDoubleSum extends UserDefinedAggregateFunction { + + private StructType _inputDataType; + + private StructType _bufferSchema; + + private DataType _returnDataType; + + public MyDoubleSum() { + List<StructField> inputFields = new ArrayList<StructField>(); + inputFields.add(DataTypes.createStructField("inputDouble", DataTypes.DoubleType, true)); + _inputDataType = DataTypes.createStructType(inputFields); + + List<StructField> bufferFields = new ArrayList<StructField>(); + bufferFields.add(DataTypes.createStructField("bufferDouble", DataTypes.DoubleType, true)); + _bufferSchema = DataTypes.createStructType(bufferFields); + + _returnDataType = DataTypes.DoubleType; + } + + @Override public StructType inputSchema() { + return _inputDataType; + } + + @Override public StructType bufferSchema() { + return _bufferSchema; + } + + @Override public DataType dataType() { + return _returnDataType; + } + + @Override public boolean deterministic() { + return true; + } + + @Override public void initialize(MutableAggregationBuffer buffer) { + // The initial value of the sum is null. + buffer.update(0, null); + } + + @Override public void update(MutableAggregationBuffer buffer, Row input) { + // This input Row only has a single column storing the input value in Double. + // We only update the buffer when the input value is not null. + if (!input.isNullAt(0)) { + if (buffer.isNullAt(0)) { + // If the buffer value (the intermediate result of the sum) is still null, + // we set the input value to the buffer. + buffer.update(0, input.getDouble(0)); + } else { + // Otherwise, we add the input value to the buffer value. + Double newValue = input.getDouble(0) + buffer.getDouble(0); + buffer.update(0, newValue); + } + } + } + + @Override public void merge(MutableAggregationBuffer buffer1, Row buffer2) { + // buffer1 and buffer2 have the same structure. + // We only update the buffer1 when the input buffer2's value is not null. + if (!buffer2.isNullAt(0)) { + if (buffer1.isNullAt(0)) { + // If the buffer value (intermediate result of the sum) is still null, + // we set the it as the input buffer's value. + buffer1.update(0, buffer2.getDouble(0)); + } else { + // Otherwise, we add the input buffer's value (buffer1) to the mutable + // buffer's value (buffer2). + Double newValue = buffer2.getDouble(0) + buffer1.getDouble(0); + buffer1.update(0, newValue); + } + } + } + + @Override public Object evaluate(Row buffer) { + if (buffer.isNullAt(0)) { + // If the buffer value is still null, we return null. + return null; + } else { + // Otherwise, the intermediate sum is the final result. + return buffer.getDouble(0); + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/cb2d2e15/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFIntegerToString.java ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFIntegerToString.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFIntegerToString.java new file mode 100644 index 0000000..6c4f378 --- /dev/null +++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFIntegerToString.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution; + +import org.apache.hadoop.hive.ql.exec.UDF; + +public class UDFIntegerToString extends UDF { + public String evaluate(Integer i) { + return i.toString(); + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/cb2d2e15/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFListListInt.java ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFListListInt.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFListListInt.java new file mode 100644 index 0000000..808e298 --- /dev/null +++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFListListInt.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution; + +import org.apache.hadoop.hive.ql.exec.UDF; + +import java.util.List; + +public class UDFListListInt extends UDF { + /** + * @param obj + * SQL schema: array<struct<x: int, y: int, z: int>> + * Java Type: List<List<Integer>> + */ + @SuppressWarnings("unchecked") + public long evaluate(Object obj) { + if (obj == null) { + return 0L; + } + List<List<?>> listList = (List<List<?>>) obj; + long retVal = 0; + for (List<?> aList : listList) { + Number someInt = (Number) aList.get(1); + try { + retVal += someInt.longValue(); + } catch (NullPointerException e) { + System.out.println(e); + } + } + return retVal; + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/cb2d2e15/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFListString.java ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFListString.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFListString.java new file mode 100644 index 0000000..f33210e --- /dev/null +++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFListString.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution; + +import java.util.List; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.hive.ql.exec.UDF; + +public class UDFListString extends UDF { + + public String evaluate(Object a) { + if (a == null) { + return null; + } + @SuppressWarnings("unchecked") + List<Object> s = (List<Object>) a; + + return StringUtils.join(s, ','); + } + + +} http://git-wip-us.apache.org/repos/asf/spark/blob/cb2d2e15/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFStringString.java ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFStringString.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFStringString.java new file mode 100644 index 0000000..a369188 --- /dev/null +++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFStringString.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution; + +import org.apache.hadoop.hive.ql.exec.UDF; + +public class UDFStringString extends UDF { + public String evaluate(String s1, String s2) { + return s1 + " " + s2; + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/cb2d2e15/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFTwoListList.java ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFTwoListList.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFTwoListList.java new file mode 100644 index 0000000..0165591 --- /dev/null +++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFTwoListList.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution; + +import org.apache.hadoop.hive.ql.exec.UDF; + +public class UDFTwoListList extends UDF { + public String evaluate(Object o1, Object o2) { + UDFListListInt udf = new UDFListListInt(); + + return String.format("%s, %s", udf.evaluate(o1), udf.evaluate(o2)); + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/cb2d2e15/sql/hive/src/test/java/test/org/apache/spark/sql/hive/JavaDataFrameSuite.java ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/java/test/org/apache/spark/sql/hive/JavaDataFrameSuite.java b/sql/hive/src/test/java/test/org/apache/spark/sql/hive/JavaDataFrameSuite.java deleted file mode 100644 index a30dfa5..0000000 --- a/sql/hive/src/test/java/test/org/apache/spark/sql/hive/JavaDataFrameSuite.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package test.org.apache.spark.sql.hive; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.*; -import org.apache.spark.sql.expressions.Window; -import org.apache.spark.sql.expressions.UserDefinedAggregateFunction; -import static org.apache.spark.sql.functions.*; -import org.apache.spark.sql.hive.HiveContext; -import org.apache.spark.sql.hive.test.TestHive$; -import org.apache.spark.sql.expressions.UserDefinedAggregateFunction; -import test.org.apache.spark.sql.hive.aggregate.MyDoubleSum; - -public class JavaDataFrameSuite { - private transient JavaSparkContext sc; - private transient HiveContext hc; - - DataFrame df; - - private void checkAnswer(DataFrame actual, List<Row> expected) { - String errorMessage = QueryTest$.MODULE$.checkAnswer(actual, expected); - if (errorMessage != null) { - Assert.fail(errorMessage); - } - } - - @Before - public void setUp() throws IOException { - hc = TestHive$.MODULE$; - sc = new JavaSparkContext(hc.sparkContext()); - - List<String> jsonObjects = new ArrayList<String>(10); - for (int i = 0; i < 10; i++) { - jsonObjects.add("{\"key\":" + i + ", \"value\":\"str" + i + "\"}"); - } - df = hc.read().json(sc.parallelize(jsonObjects)); - df.registerTempTable("window_table"); - } - - @After - public void tearDown() throws IOException { - // Clean up tables. - if (hc != null) { - hc.sql("DROP TABLE IF EXISTS window_table"); - } - } - - @Test - public void saveTableAndQueryIt() { - checkAnswer( - df.select(functions.avg("key").over( - Window.partitionBy("value").orderBy("key").rowsBetween(-1, 1))), - hc.sql("SELECT avg(key) " + - "OVER (PARTITION BY value " + - " ORDER BY key " + - " ROWS BETWEEN 1 preceding and 1 following) " + - "FROM window_table").collectAsList()); - } - - @Test - public void testUDAF() { - DataFrame df = hc.range(0, 100).unionAll(hc.range(0, 100)).select(col("id").as("value")); - UserDefinedAggregateFunction udaf = new MyDoubleSum(); - UserDefinedAggregateFunction registeredUDAF = hc.udf().register("mydoublesum", udaf); - // Create Columns for the UDAF. For now, callUDF does not take an argument to specific if - // we want to use distinct aggregation. - DataFrame aggregatedDF = - df.groupBy() - .agg( - udaf.distinct(col("value")), - udaf.apply(col("value")), - registeredUDAF.apply(col("value")), - callUDF("mydoublesum", col("value"))); - - List<Row> expectedResult = new ArrayList<Row>(); - expectedResult.add(RowFactory.create(4950.0, 9900.0, 9900.0, 9900.0)); - checkAnswer( - aggregatedDF, - expectedResult); - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/cb2d2e15/sql/hive/src/test/java/test/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/java/test/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java b/sql/hive/src/test/java/test/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java deleted file mode 100644 index 15c2c3d..0000000 --- a/sql/hive/src/test/java/test/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package test.org.apache.spark.sql.hive; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.spark.sql.SaveMode; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.DataFrame; -import org.apache.spark.sql.QueryTest$; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.hive.HiveContext; -import org.apache.spark.sql.hive.test.TestHive$; -import org.apache.spark.sql.types.DataTypes; -import org.apache.spark.sql.types.StructField; -import org.apache.spark.sql.types.StructType; -import org.apache.spark.util.Utils; - -public class JavaMetastoreDataSourcesSuite { - private transient JavaSparkContext sc; - private transient HiveContext sqlContext; - - String originalDefaultSource; - File path; - Path hiveManagedPath; - FileSystem fs; - DataFrame df; - - private void checkAnswer(DataFrame actual, List<Row> expected) { - String errorMessage = QueryTest$.MODULE$.checkAnswer(actual, expected); - if (errorMessage != null) { - Assert.fail(errorMessage); - } - } - - @Before - public void setUp() throws IOException { - sqlContext = TestHive$.MODULE$; - sc = new JavaSparkContext(sqlContext.sparkContext()); - - originalDefaultSource = sqlContext.conf().defaultDataSourceName(); - path = - Utils.createTempDir(System.getProperty("java.io.tmpdir"), "datasource").getCanonicalFile(); - if (path.exists()) { - path.delete(); - } - hiveManagedPath = new Path(sqlContext.catalog().hiveDefaultTableFilePath("javaSavedTable")); - fs = hiveManagedPath.getFileSystem(sc.hadoopConfiguration()); - if (fs.exists(hiveManagedPath)){ - fs.delete(hiveManagedPath, true); - } - - List<String> jsonObjects = new ArrayList<String>(10); - for (int i = 0; i < 10; i++) { - jsonObjects.add("{\"a\":" + i + ", \"b\":\"str" + i + "\"}"); - } - JavaRDD<String> rdd = sc.parallelize(jsonObjects); - df = sqlContext.read().json(rdd); - df.registerTempTable("jsonTable"); - } - - @After - public void tearDown() throws IOException { - // Clean up tables. - if (sqlContext != null) { - sqlContext.sql("DROP TABLE IF EXISTS javaSavedTable"); - sqlContext.sql("DROP TABLE IF EXISTS externalTable"); - } - } - - @Test - public void saveExternalTableAndQueryIt() { - Map<String, String> options = new HashMap<String, String>(); - options.put("path", path.toString()); - df.write() - .format("org.apache.spark.sql.json") - .mode(SaveMode.Append) - .options(options) - .saveAsTable("javaSavedTable"); - - checkAnswer( - sqlContext.sql("SELECT * FROM javaSavedTable"), - df.collectAsList()); - - DataFrame loadedDF = - sqlContext.createExternalTable("externalTable", "org.apache.spark.sql.json", options); - - checkAnswer(loadedDF, df.collectAsList()); - checkAnswer( - sqlContext.sql("SELECT * FROM externalTable"), - df.collectAsList()); - } - - @Test - public void saveExternalTableWithSchemaAndQueryIt() { - Map<String, String> options = new HashMap<String, String>(); - options.put("path", path.toString()); - df.write() - .format("org.apache.spark.sql.json") - .mode(SaveMode.Append) - .options(options) - .saveAsTable("javaSavedTable"); - - checkAnswer( - sqlContext.sql("SELECT * FROM javaSavedTable"), - df.collectAsList()); - - List<StructField> fields = new ArrayList<StructField>(); - fields.add(DataTypes.createStructField("b", DataTypes.StringType, true)); - StructType schema = DataTypes.createStructType(fields); - DataFrame loadedDF = - sqlContext.createExternalTable("externalTable", "org.apache.spark.sql.json", schema, options); - - checkAnswer( - loadedDF, - sqlContext.sql("SELECT b FROM javaSavedTable").collectAsList()); - checkAnswer( - sqlContext.sql("SELECT * FROM externalTable"), - sqlContext.sql("SELECT b FROM javaSavedTable").collectAsList()); - } - - @Test - public void saveTableAndQueryIt() { - Map<String, String> options = new HashMap<String, String>(); - df.write() - .format("org.apache.spark.sql.json") - .mode(SaveMode.Append) - .options(options) - .saveAsTable("javaSavedTable"); - - checkAnswer( - sqlContext.sql("SELECT * FROM javaSavedTable"), - df.collectAsList()); - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/cb2d2e15/sql/hive/src/test/java/test/org/apache/spark/sql/hive/aggregate/MyDoubleAvg.java ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/java/test/org/apache/spark/sql/hive/aggregate/MyDoubleAvg.java b/sql/hive/src/test/java/test/org/apache/spark/sql/hive/aggregate/MyDoubleAvg.java deleted file mode 100644 index 2961b80..0000000 --- a/sql/hive/src/test/java/test/org/apache/spark/sql/hive/aggregate/MyDoubleAvg.java +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package test.org.apache.spark.sql.hive.aggregate; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.spark.sql.Row; -import org.apache.spark.sql.expressions.MutableAggregationBuffer; -import org.apache.spark.sql.expressions.UserDefinedAggregateFunction; -import org.apache.spark.sql.types.DataType; -import org.apache.spark.sql.types.DataTypes; -import org.apache.spark.sql.types.StructField; -import org.apache.spark.sql.types.StructType; - -/** - * An example {@link UserDefinedAggregateFunction} to calculate a special average value of a - * {@link org.apache.spark.sql.types.DoubleType} column. This special average value is the sum - * of the average value of input values and 100.0. - */ -public class MyDoubleAvg extends UserDefinedAggregateFunction { - - private StructType _inputDataType; - - private StructType _bufferSchema; - - private DataType _returnDataType; - - public MyDoubleAvg() { - List<StructField> inputFields = new ArrayList<StructField>(); - inputFields.add(DataTypes.createStructField("inputDouble", DataTypes.DoubleType, true)); - _inputDataType = DataTypes.createStructType(inputFields); - - // The buffer has two values, bufferSum for storing the current sum and - // bufferCount for storing the number of non-null input values that have been contribuetd - // to the current sum. - List<StructField> bufferFields = new ArrayList<StructField>(); - bufferFields.add(DataTypes.createStructField("bufferSum", DataTypes.DoubleType, true)); - bufferFields.add(DataTypes.createStructField("bufferCount", DataTypes.LongType, true)); - _bufferSchema = DataTypes.createStructType(bufferFields); - - _returnDataType = DataTypes.DoubleType; - } - - @Override public StructType inputSchema() { - return _inputDataType; - } - - @Override public StructType bufferSchema() { - return _bufferSchema; - } - - @Override public DataType dataType() { - return _returnDataType; - } - - @Override public boolean deterministic() { - return true; - } - - @Override public void initialize(MutableAggregationBuffer buffer) { - // The initial value of the sum is null. - buffer.update(0, null); - // The initial value of the count is 0. - buffer.update(1, 0L); - } - - @Override public void update(MutableAggregationBuffer buffer, Row input) { - // This input Row only has a single column storing the input value in Double. - // We only update the buffer when the input value is not null. - if (!input.isNullAt(0)) { - // If the buffer value (the intermediate result of the sum) is still null, - // we set the input value to the buffer and set the bufferCount to 1. - if (buffer.isNullAt(0)) { - buffer.update(0, input.getDouble(0)); - buffer.update(1, 1L); - } else { - // Otherwise, update the bufferSum and increment bufferCount. - Double newValue = input.getDouble(0) + buffer.getDouble(0); - buffer.update(0, newValue); - buffer.update(1, buffer.getLong(1) + 1L); - } - } - } - - @Override public void merge(MutableAggregationBuffer buffer1, Row buffer2) { - // buffer1 and buffer2 have the same structure. - // We only update the buffer1 when the input buffer2's sum value is not null. - if (!buffer2.isNullAt(0)) { - if (buffer1.isNullAt(0)) { - // If the buffer value (intermediate result of the sum) is still null, - // we set the it as the input buffer's value. - buffer1.update(0, buffer2.getDouble(0)); - buffer1.update(1, buffer2.getLong(1)); - } else { - // Otherwise, we update the bufferSum and bufferCount. - Double newValue = buffer2.getDouble(0) + buffer1.getDouble(0); - buffer1.update(0, newValue); - buffer1.update(1, buffer1.getLong(1) + buffer2.getLong(1)); - } - } - } - - @Override public Object evaluate(Row buffer) { - if (buffer.isNullAt(0)) { - // If the bufferSum is still null, we return null because this function has not got - // any input row. - return null; - } else { - // Otherwise, we calculate the special average value. - return buffer.getDouble(0) / buffer.getLong(1) + 100.0; - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/cb2d2e15/sql/hive/src/test/java/test/org/apache/spark/sql/hive/aggregate/MyDoubleSum.java ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/java/test/org/apache/spark/sql/hive/aggregate/MyDoubleSum.java b/sql/hive/src/test/java/test/org/apache/spark/sql/hive/aggregate/MyDoubleSum.java deleted file mode 100644 index c71882a..0000000 --- a/sql/hive/src/test/java/test/org/apache/spark/sql/hive/aggregate/MyDoubleSum.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package test.org.apache.spark.sql.hive.aggregate; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.spark.sql.expressions.MutableAggregationBuffer; -import org.apache.spark.sql.expressions.UserDefinedAggregateFunction; -import org.apache.spark.sql.types.StructField; -import org.apache.spark.sql.types.StructType; -import org.apache.spark.sql.types.DataType; -import org.apache.spark.sql.types.DataTypes; -import org.apache.spark.sql.Row; - -/** - * An example {@link UserDefinedAggregateFunction} to calculate the sum of a - * {@link org.apache.spark.sql.types.DoubleType} column. - */ -public class MyDoubleSum extends UserDefinedAggregateFunction { - - private StructType _inputDataType; - - private StructType _bufferSchema; - - private DataType _returnDataType; - - public MyDoubleSum() { - List<StructField> inputFields = new ArrayList<StructField>(); - inputFields.add(DataTypes.createStructField("inputDouble", DataTypes.DoubleType, true)); - _inputDataType = DataTypes.createStructType(inputFields); - - List<StructField> bufferFields = new ArrayList<StructField>(); - bufferFields.add(DataTypes.createStructField("bufferDouble", DataTypes.DoubleType, true)); - _bufferSchema = DataTypes.createStructType(bufferFields); - - _returnDataType = DataTypes.DoubleType; - } - - @Override public StructType inputSchema() { - return _inputDataType; - } - - @Override public StructType bufferSchema() { - return _bufferSchema; - } - - @Override public DataType dataType() { - return _returnDataType; - } - - @Override public boolean deterministic() { - return true; - } - - @Override public void initialize(MutableAggregationBuffer buffer) { - // The initial value of the sum is null. - buffer.update(0, null); - } - - @Override public void update(MutableAggregationBuffer buffer, Row input) { - // This input Row only has a single column storing the input value in Double. - // We only update the buffer when the input value is not null. - if (!input.isNullAt(0)) { - if (buffer.isNullAt(0)) { - // If the buffer value (the intermediate result of the sum) is still null, - // we set the input value to the buffer. - buffer.update(0, input.getDouble(0)); - } else { - // Otherwise, we add the input value to the buffer value. - Double newValue = input.getDouble(0) + buffer.getDouble(0); - buffer.update(0, newValue); - } - } - } - - @Override public void merge(MutableAggregationBuffer buffer1, Row buffer2) { - // buffer1 and buffer2 have the same structure. - // We only update the buffer1 when the input buffer2's value is not null. - if (!buffer2.isNullAt(0)) { - if (buffer1.isNullAt(0)) { - // If the buffer value (intermediate result of the sum) is still null, - // we set the it as the input buffer's value. - buffer1.update(0, buffer2.getDouble(0)); - } else { - // Otherwise, we add the input buffer's value (buffer1) to the mutable - // buffer's value (buffer2). - Double newValue = buffer2.getDouble(0) + buffer1.getDouble(0); - buffer1.update(0, newValue); - } - } - } - - @Override public Object evaluate(Row buffer) { - if (buffer.isNullAt(0)) { - // If the buffer value is still null, we return null. - return null; - } else { - // Otherwise, the intermediate sum is the final result. - return buffer.getDouble(0); - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/cb2d2e15/sql/hive/src/test/java/test/org/apache/spark/sql/hive/execution/UDFIntegerToString.java ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/java/test/org/apache/spark/sql/hive/execution/UDFIntegerToString.java b/sql/hive/src/test/java/test/org/apache/spark/sql/hive/execution/UDFIntegerToString.java deleted file mode 100644 index 6c4f378..0000000 --- a/sql/hive/src/test/java/test/org/apache/spark/sql/hive/execution/UDFIntegerToString.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive.execution; - -import org.apache.hadoop.hive.ql.exec.UDF; - -public class UDFIntegerToString extends UDF { - public String evaluate(Integer i) { - return i.toString(); - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/cb2d2e15/sql/hive/src/test/java/test/org/apache/spark/sql/hive/execution/UDFListListInt.java ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/java/test/org/apache/spark/sql/hive/execution/UDFListListInt.java b/sql/hive/src/test/java/test/org/apache/spark/sql/hive/execution/UDFListListInt.java deleted file mode 100644 index 808e298..0000000 --- a/sql/hive/src/test/java/test/org/apache/spark/sql/hive/execution/UDFListListInt.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive.execution; - -import org.apache.hadoop.hive.ql.exec.UDF; - -import java.util.List; - -public class UDFListListInt extends UDF { - /** - * @param obj - * SQL schema: array<struct<x: int, y: int, z: int>> - * Java Type: List<List<Integer>> - */ - @SuppressWarnings("unchecked") - public long evaluate(Object obj) { - if (obj == null) { - return 0L; - } - List<List<?>> listList = (List<List<?>>) obj; - long retVal = 0; - for (List<?> aList : listList) { - Number someInt = (Number) aList.get(1); - try { - retVal += someInt.longValue(); - } catch (NullPointerException e) { - System.out.println(e); - } - } - return retVal; - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/cb2d2e15/sql/hive/src/test/java/test/org/apache/spark/sql/hive/execution/UDFListString.java ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/java/test/org/apache/spark/sql/hive/execution/UDFListString.java b/sql/hive/src/test/java/test/org/apache/spark/sql/hive/execution/UDFListString.java deleted file mode 100644 index f33210e..0000000 --- a/sql/hive/src/test/java/test/org/apache/spark/sql/hive/execution/UDFListString.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive.execution; - -import java.util.List; - -import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.hive.ql.exec.UDF; - -public class UDFListString extends UDF { - - public String evaluate(Object a) { - if (a == null) { - return null; - } - @SuppressWarnings("unchecked") - List<Object> s = (List<Object>) a; - - return StringUtils.join(s, ','); - } - - -} http://git-wip-us.apache.org/repos/asf/spark/blob/cb2d2e15/sql/hive/src/test/java/test/org/apache/spark/sql/hive/execution/UDFStringString.java ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/java/test/org/apache/spark/sql/hive/execution/UDFStringString.java b/sql/hive/src/test/java/test/org/apache/spark/sql/hive/execution/UDFStringString.java deleted file mode 100644 index a369188..0000000 --- a/sql/hive/src/test/java/test/org/apache/spark/sql/hive/execution/UDFStringString.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive.execution; - -import org.apache.hadoop.hive.ql.exec.UDF; - -public class UDFStringString extends UDF { - public String evaluate(String s1, String s2) { - return s1 + " " + s2; - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/cb2d2e15/sql/hive/src/test/java/test/org/apache/spark/sql/hive/execution/UDFTwoListList.java ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/java/test/org/apache/spark/sql/hive/execution/UDFTwoListList.java b/sql/hive/src/test/java/test/org/apache/spark/sql/hive/execution/UDFTwoListList.java deleted file mode 100644 index 0165591..0000000 --- a/sql/hive/src/test/java/test/org/apache/spark/sql/hive/execution/UDFTwoListList.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive.execution; - -import org.apache.hadoop.hive.ql.exec.UDF; - -public class UDFTwoListList extends UDF { - public String evaluate(Object o1, Object o2) { - UDFListListInt udf = new UDFListListInt(); - - return String.format("%s, %s", udf.evaluate(o1), udf.evaluate(o2)); - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/cb2d2e15/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala index 119663a..4886a85 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.execution.aggregate import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} -import _root_.test.org.apache.spark.sql.hive.aggregate.{MyDoubleAvg, MyDoubleSum} +import org.apache.spark.sql.hive.aggregate.{MyDoubleAvg, MyDoubleSum} abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with BeforeAndAfterAll { override def _sqlContext: SQLContext = TestHive --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org