Repository: incubator-livy Updated Branches: refs/heads/master cb5b8aac5 -> 412ccc8fc
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/test-lib/src/main/java/org/apache/livy/test/jobs/Failure.java ---------------------------------------------------------------------- diff --git a/test-lib/src/main/java/org/apache/livy/test/jobs/Failure.java b/test-lib/src/main/java/org/apache/livy/test/jobs/Failure.java new file mode 100644 index 0000000..f56f400 --- /dev/null +++ b/test-lib/src/main/java/org/apache/livy/test/jobs/Failure.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.test.jobs; + +import org.apache.livy.Job; +import org.apache.livy.JobContext; + +public class Failure implements Job<Void> { + + @Override + public Void call(JobContext jc) { + throw new JobFailureException(); + } + + public static class JobFailureException extends RuntimeException { + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/test-lib/src/main/java/org/apache/livy/test/jobs/FileReader.java ---------------------------------------------------------------------- diff --git a/test-lib/src/main/java/org/apache/livy/test/jobs/FileReader.java b/test-lib/src/main/java/org/apache/livy/test/jobs/FileReader.java new file mode 100644 index 0000000..8b22c8f --- /dev/null +++ b/test-lib/src/main/java/org/apache/livy/test/jobs/FileReader.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.test.jobs; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import static java.nio.charset.StandardCharsets.UTF_8; + +import org.apache.spark.SparkFiles; +import org.apache.spark.api.java.function.Function; + +import org.apache.livy.Job; +import org.apache.livy.JobContext; + +public class FileReader implements Job<String> { + + private final boolean isResource; + private final String fileName; + + public FileReader(String fileName, boolean isResource) { + this.fileName = fileName; + this.isResource = isResource; + } + + @Override + public String call(JobContext jc) { + return jc.sc().parallelize(Arrays.asList(1)).map(new Reader()).collect().get(0); + } + + private class Reader implements Function<Integer, String> { + + @Override + public String call(Integer i) throws Exception { + InputStream in; + if (isResource) { + ClassLoader ccl = Thread.currentThread().getContextClassLoader(); + in = ccl.getResourceAsStream(fileName); + if (in == null) { + throw new IOException("Resource not found: " + fileName); + } + } else { + in = new FileInputStream(SparkFiles.get(fileName)); + } + try { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + byte[] buf = new byte[1024]; + int read; + while ((read = in.read(buf)) >= 0) { + out.write(buf, 0, read); + } + byte[] bytes = out.toByteArray(); + return new String(bytes, 0, bytes.length, UTF_8); + } finally { + in.close(); + } + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/test-lib/src/main/java/org/apache/livy/test/jobs/GetCurrentUser.java ---------------------------------------------------------------------- diff --git a/test-lib/src/main/java/org/apache/livy/test/jobs/GetCurrentUser.java b/test-lib/src/main/java/org/apache/livy/test/jobs/GetCurrentUser.java new file mode 100644 index 0000000..28a6ddc --- /dev/null +++ b/test-lib/src/main/java/org/apache/livy/test/jobs/GetCurrentUser.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.test.jobs; + +import org.apache.hadoop.security.UserGroupInformation; + +import org.apache.livy.Job; +import org.apache.livy.JobContext; + +public class GetCurrentUser implements Job<String> { + + @Override + public String call(JobContext jc) throws Exception { + return UserGroupInformation.getCurrentUser().getUserName(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/test-lib/src/main/java/org/apache/livy/test/jobs/SQLGetTweets.java ---------------------------------------------------------------------- diff --git a/test-lib/src/main/java/org/apache/livy/test/jobs/SQLGetTweets.java b/test-lib/src/main/java/org/apache/livy/test/jobs/SQLGetTweets.java new file mode 100644 index 0000000..a17f188 --- /dev/null +++ b/test-lib/src/main/java/org/apache/livy/test/jobs/SQLGetTweets.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.test.jobs; + +import java.io.File; +import java.io.InputStream; +import java.net.URI; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SQLContext; + +import org.apache.livy.Job; +import org.apache.livy.JobContext; + +public class SQLGetTweets implements Job<List<String>> { + + private final boolean useHiveContext; + + public SQLGetTweets(boolean useHiveContext) { + this.useHiveContext = useHiveContext; + } + + @Override + public List<String> call(JobContext jc) throws Exception { + InputStream source = getClass().getResourceAsStream("/testweet.json"); + + // Save the resource as a file in HDFS (or the local tmp dir when using a local filesystem). + URI input; + File local = File.createTempFile("tweets", ".json", jc.getLocalTmpDir()); + Files.copy(source, local.toPath(), StandardCopyOption.REPLACE_EXISTING); + FileSystem fs = FileSystem.get(jc.sc().sc().hadoopConfiguration()); + if ("file".equals(fs.getUri().getScheme())) { + input = local.toURI(); + } else { + String uuid = UUID.randomUUID().toString(); + Path target = new Path("/tmp/" + uuid + "-tweets.json"); + fs.copyFromLocalFile(new Path(local.toURI()), target); + input = target.toUri(); + } + + SQLContext sqlctx = useHiveContext ? jc.hivectx() : jc.sqlctx(); + sqlctx.jsonFile(input.toString()).registerTempTable("tweets"); + + List<String> tweetList = new ArrayList<>(); + Row[] result = + (Row[])(sqlctx.sql("SELECT text, retweetCount FROM tweets ORDER BY retweetCount LIMIT 10") + .collect()); + for (Row r : result) { + tweetList.add(r.toString()); + } + return tweetList; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/test-lib/src/main/java/org/apache/livy/test/jobs/Sleeper.java ---------------------------------------------------------------------- diff --git a/test-lib/src/main/java/org/apache/livy/test/jobs/Sleeper.java b/test-lib/src/main/java/org/apache/livy/test/jobs/Sleeper.java new file mode 100644 index 0000000..89fe785 --- /dev/null +++ b/test-lib/src/main/java/org/apache/livy/test/jobs/Sleeper.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.test.jobs; + +import org.apache.livy.Job; +import org.apache.livy.JobContext; + +public class Sleeper implements Job<Void> { + + private final long millis; + + public Sleeper(long millis) { + this.millis = millis; + } + + @Override + public Void call(JobContext jc) throws Exception { + Thread.sleep(millis); + return null; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/test-lib/src/main/java/org/apache/livy/test/jobs/SmallCount.java ---------------------------------------------------------------------- diff --git a/test-lib/src/main/java/org/apache/livy/test/jobs/SmallCount.java b/test-lib/src/main/java/org/apache/livy/test/jobs/SmallCount.java new file mode 100644 index 0000000..e7c8ee4 --- /dev/null +++ b/test-lib/src/main/java/org/apache/livy/test/jobs/SmallCount.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.test.jobs; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import org.apache.livy.Job; +import org.apache.livy.JobContext; + +public class SmallCount implements Job<Long> { + + private final int count; + + public SmallCount(int count) { + this.count = count; + } + + @Override + public Long call(JobContext jc) { + Random r = new Random(); + int partitions = Math.min(r.nextInt(10) + 1, count); + + List<Integer> elements = new ArrayList<>(count); + for (int i = 0; i < count; i++) { + elements.add(r.nextInt()); + } + + return jc.sc().parallelize(elements, partitions).count(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/test-lib/src/main/java/org/apache/livy/test/jobs/VoidJob.java ---------------------------------------------------------------------- diff --git a/test-lib/src/main/java/org/apache/livy/test/jobs/VoidJob.java b/test-lib/src/main/java/org/apache/livy/test/jobs/VoidJob.java new file mode 100644 index 0000000..d6875c8 --- /dev/null +++ b/test-lib/src/main/java/org/apache/livy/test/jobs/VoidJob.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.test.jobs; + +import org.apache.livy.Job; +import org.apache.livy.JobContext; + +public class VoidJob implements Job<Void> { + @Override + public Void call(JobContext jc) { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/test-lib/src/main/scala/com/cloudera/livy/test/jobs/ScalaEcho.scala ---------------------------------------------------------------------- diff --git a/test-lib/src/main/scala/com/cloudera/livy/test/jobs/ScalaEcho.scala b/test-lib/src/main/scala/com/cloudera/livy/test/jobs/ScalaEcho.scala deleted file mode 100644 index e504d69..0000000 --- a/test-lib/src/main/scala/com/cloudera/livy/test/jobs/ScalaEcho.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.cloudera.livy.test.jobs - -import scala.reflect.ClassTag - -import com.cloudera.livy.{Job, JobContext} - -case class ValueHolder[T](value: T) - -class ScalaEcho[T: ClassTag](val value: T)(implicit val tag: ClassTag[T]) extends Job[T] { - - override def call(jc: JobContext): T = { - jc.sc().sc.parallelize(Seq(value), 1)(tag).collect()(0) - } - -} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/test-lib/src/main/scala/org/apache/livy/test/jobs/ScalaEcho.scala ---------------------------------------------------------------------- diff --git a/test-lib/src/main/scala/org/apache/livy/test/jobs/ScalaEcho.scala b/test-lib/src/main/scala/org/apache/livy/test/jobs/ScalaEcho.scala new file mode 100644 index 0000000..a4c6032 --- /dev/null +++ b/test-lib/src/main/scala/org/apache/livy/test/jobs/ScalaEcho.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.test.jobs + +import scala.reflect.ClassTag + +import org.apache.livy.{Job, JobContext} + +case class ValueHolder[T](value: T) + +class ScalaEcho[T: ClassTag](val value: T)(implicit val tag: ClassTag[T]) extends Job[T] { + + override def call(jc: JobContext): T = { + jc.sc().sc.parallelize(Seq(value), 1)(tag).collect()(0) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/test-lib/src/main/spark2/java/com/cloudera/livy/test/jobs/spark2/DatasetTest.java ---------------------------------------------------------------------- diff --git a/test-lib/src/main/spark2/java/com/cloudera/livy/test/jobs/spark2/DatasetTest.java b/test-lib/src/main/spark2/java/com/cloudera/livy/test/jobs/spark2/DatasetTest.java deleted file mode 100644 index 4259d5e..0000000 --- a/test-lib/src/main/spark2/java/com/cloudera/livy/test/jobs/spark2/DatasetTest.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.cloudera.livy.test.jobs.spark2; - -import java.util.Arrays; - -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.FilterFunction; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.RowFactory; -import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.types.DataTypes; -import org.apache.spark.sql.types.StructField; -import org.apache.spark.sql.types.StructType; - -import com.cloudera.livy.Job; -import com.cloudera.livy.JobContext; - -public class DatasetTest implements Job<Long> { - - @Override - public Long call(JobContext jc) throws Exception { - SparkSession spark = jc.sparkSession(); - - JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - JavaRDD<Row> rdd = sc.parallelize(Arrays.asList(1, 2, 3)).map( - new Function<Integer, Row>() { - public Row call(Integer integer) throws Exception { - return RowFactory.create(integer); - } - }); - StructType schema = DataTypes.createStructType(new StructField[] { - DataTypes.createStructField("value", DataTypes.IntegerType, false) - }); - - Dataset<Row> ds = spark.createDataFrame(rdd, schema); - - return ds.filter(new FilterFunction<Row>() { - @Override - public boolean call(Row row) throws Exception { - return row.getInt(0) >= 2; - } - }).count(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/test-lib/src/main/spark2/java/com/cloudera/livy/test/jobs/spark2/SparkSessionTest.java ---------------------------------------------------------------------- diff --git a/test-lib/src/main/spark2/java/com/cloudera/livy/test/jobs/spark2/SparkSessionTest.java b/test-lib/src/main/spark2/java/com/cloudera/livy/test/jobs/spark2/SparkSessionTest.java deleted file mode 100644 index 1019670..0000000 --- a/test-lib/src/main/spark2/java/com/cloudera/livy/test/jobs/spark2/SparkSessionTest.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.cloudera.livy.test.jobs.spark2; - -import java.util.Arrays; - -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.SparkSession; - -import com.cloudera.livy.Job; -import com.cloudera.livy.JobContext; - -public class SparkSessionTest implements Job<Long> { - - @Override - public Long call(JobContext jc) throws Exception { - // Make sure SparkSession and SparkContext is callable - SparkSession session = jc.sparkSession(); - - JavaSparkContext sc = new JavaSparkContext(session.sparkContext()); - return sc.parallelize(Arrays.asList(1, 2, 3)).count(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/test-lib/src/main/spark2/java/org/apache/livy/test/jobs/spark2/DatasetTest.java ---------------------------------------------------------------------- diff --git a/test-lib/src/main/spark2/java/org/apache/livy/test/jobs/spark2/DatasetTest.java b/test-lib/src/main/spark2/java/org/apache/livy/test/jobs/spark2/DatasetTest.java new file mode 100644 index 0000000..92940c5 --- /dev/null +++ b/test-lib/src/main/spark2/java/org/apache/livy/test/jobs/spark2/DatasetTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.test.jobs.spark2; + +import java.util.Arrays; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; + +import org.apache.livy.Job; +import org.apache.livy.JobContext; + +public class DatasetTest implements Job<Long> { + + @Override + public Long call(JobContext jc) throws Exception { + SparkSession spark = jc.sparkSession(); + + JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + JavaRDD<Row> rdd = sc.parallelize(Arrays.asList(1, 2, 3)).map( + new Function<Integer, Row>() { + public Row call(Integer integer) throws Exception { + return RowFactory.create(integer); + } + }); + StructType schema = DataTypes.createStructType(new StructField[] { + DataTypes.createStructField("value", DataTypes.IntegerType, false) + }); + + Dataset<Row> ds = spark.createDataFrame(rdd, schema); + + return ds.filter(new FilterFunction<Row>() { + @Override + public boolean call(Row row) throws Exception { + return row.getInt(0) >= 2; + } + }).count(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/test-lib/src/main/spark2/java/org/apache/livy/test/jobs/spark2/SparkSessionTest.java ---------------------------------------------------------------------- diff --git a/test-lib/src/main/spark2/java/org/apache/livy/test/jobs/spark2/SparkSessionTest.java b/test-lib/src/main/spark2/java/org/apache/livy/test/jobs/spark2/SparkSessionTest.java new file mode 100644 index 0000000..d04d5e5 --- /dev/null +++ b/test-lib/src/main/spark2/java/org/apache/livy/test/jobs/spark2/SparkSessionTest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.test.jobs.spark2; + +import java.util.Arrays; + +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; + +import org.apache.livy.Job; +import org.apache.livy.JobContext; + +public class SparkSessionTest implements Job<Long> { + + @Override + public Long call(JobContext jc) throws Exception { + // Make sure SparkSession and SparkContext is callable + SparkSession session = jc.sparkSession(); + + JavaSparkContext sc = new JavaSparkContext(session.sparkContext()); + return sc.parallelize(Arrays.asList(1, 2, 3)).count(); + } +}