[ https://issues.apache.org/jira/browse/SPARK-24593?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
lhq updated SPARK-24593: ------------------------ Environment: {code:java} // demo code /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.spark.examples.streaming; import java.util.Arrays; import java.util.regex.Pattern; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.api.java.StorageLevels; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; /** * Use DataFrames and SQL to count words in UTF8 encoded, '\n' delimited text received from the * network every second. * * Usage: JavaSqlNetworkWordCount <hostname> <port> * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data. * * To run this on your local machine, you need to first run a Netcat server * `$ nc -lk 9999` * and then run the example * `$ bin/run-example org.apache.spark.examples.streaming.JavaSqlNetworkWordCount localhost 9999` */ public final class JavaSqlNetworkWordCount { private static final Pattern SPACE = Pattern.compile(" "); public static void main(String[] args) throws Exception { if (args.length < 2) { System.err.println("Usage: JavaNetworkWordCount <hostname> <port>"); System.exit(1); } StreamingExamples.setStreamingLogLevels(); // Create the context with a 1 second batch size SparkConf sparkConf = new SparkConf().setAppName("JavaSqlNetworkWordCount"); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1)); // Create a JavaReceiverInputDStream on target ip:port and count the // words in input stream of \n delimited text (eg. generated by 'nc') // Note that no duplication in storage level only for running locally. // Replication necessary in distributed scenario for fault tolerance. JavaReceiverInputDStream<String> lines = ssc.socketTextStream( args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER); JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator()); // Convert RDDs of the words DStream to DataFrame and run SQL query words.foreachRDD((rdd, time) -> { SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf()); // Convert JavaRDD[String] to JavaRDD[bean class] to DataFrame JavaRDD<JavaRecord> rowRDD = rdd.map(word -> { JavaRecord record = new JavaRecord(); record.setWord(word); return record; }); Dataset<Row> wordsDataFrame = spark.createDataFrame(rowRDD, JavaRecord.class); // Creates a temporary view using the DataFrame wordsDataFrame.createOrReplaceTempView("words"); // Do word count on table using SQL and print it Dataset<Row> wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word"); System.out.println("========= " + time + "========="); wordCountsDataFrame.show(); // access hive talbe failed spark.sql("select * from test_fch"); }); ssc.start(); ssc.awaitTermination(); } } /** Lazily instantiated singleton instance of SparkSession */ class JavaSparkSessionSingleton { private static transient SparkSession instance = null; public static SparkSession getInstance(SparkConf sparkConf) { if (instance == null) { instance = SparkSession .builder() .config(sparkConf) .enableHiveSupport() .getOrCreate(); } return instance; } } {code} was: {code:java} // demo code /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.spark.examples.streaming; import java.util.Arrays; import java.util.regex.Pattern; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.api.java.StorageLevels; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; /** * Use DataFrames and SQL to count words in UTF8 encoded, '\n' delimited text received from the * network every second. * * Usage: JavaSqlNetworkWordCount <hostname> <port> * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data. * * To run this on your local machine, you need to first run a Netcat server * `$ nc -lk 9999` * and then run the example * `$ bin/run-example org.apache.spark.examples.streaming.JavaSqlNetworkWordCount localhost 9999` */ public final class JavaSqlNetworkWordCount { private static final Pattern SPACE = Pattern.compile(" "); public static void main(String[] args) throws Exception { if (args.length < 2) { System.err.println("Usage: JavaNetworkWordCount <hostname> <port>"); System.exit(1); } StreamingExamples.setStreamingLogLevels(); // Create the context with a 1 second batch size SparkConf sparkConf = new SparkConf().setAppName("JavaSqlNetworkWordCount"); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1)); // Create a JavaReceiverInputDStream on target ip:port and count the // words in input stream of \n delimited text (eg. generated by 'nc') // Note that no duplication in storage level only for running locally. // Replication necessary in distributed scenario for fault tolerance. JavaReceiverInputDStream<String> lines = ssc.socketTextStream( args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER); JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator()); // Convert RDDs of the words DStream to DataFrame and run SQL query words.foreachRDD((rdd, time) -> { SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf()); // Convert JavaRDD[String] to JavaRDD[bean class] to DataFrame JavaRDD<JavaRecord> rowRDD = rdd.map(word -> { JavaRecord record = new JavaRecord(); record.setWord(word); return record; }); Dataset<Row> wordsDataFrame = spark.createDataFrame(rowRDD, JavaRecord.class); // Creates a temporary view using the DataFrame wordsDataFrame.createOrReplaceTempView("words"); // Do word count on table using SQL and print it Dataset<Row> wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word"); System.out.println("========= " + time + "========="); wordCountsDataFrame.show(); spark.sql("select * from test_fch"); }); ssc.start(); ssc.awaitTermination(); } } /** Lazily instantiated singleton instance of SparkSession */ class JavaSparkSessionSingleton { private static transient SparkSession instance = null; public static SparkSession getInstance(SparkConf sparkConf) { if (instance == null) { instance = SparkSession .builder() .config(sparkConf) .enableHiveSupport() .getOrCreate(); } return instance; } } {code} > can not find hive table after spark streaming started > ----------------------------------------------------- > > Key: SPARK-24593 > URL: https://issues.apache.org/jira/browse/SPARK-24593 > Project: Spark > Issue Type: Bug > Components: DStreams, SQL > Affects Versions: 2.3.0, 2.3.1 > Environment: {code:java} > // demo code > /* > * Licensed to the Apache Software Foundation (ASF) under one or more > * contributor license agreements. See the NOTICE file distributed with > * this work for additional information regarding copyright ownership. > * The ASF licenses this file to You under the Apache License, Version 2.0 > * (the "License"); you may not use this file except in compliance with > * the License. You may obtain a copy of the License at > * > * http://www.apache.org/licenses/LICENSE-2.0 > * > * Unless required by applicable law or agreed to in writing, software > * distributed under the License is distributed on an "AS IS" BASIS, > * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > * See the License for the specific language governing permissions and > * limitations under the License. > */ > package org.apache.spark.examples.streaming; > import java.util.Arrays; > import java.util.regex.Pattern; > import org.apache.spark.SparkConf; > import org.apache.spark.api.java.JavaRDD; > import org.apache.spark.sql.Dataset; > import org.apache.spark.sql.Row; > import org.apache.spark.sql.SparkSession; > import org.apache.spark.api.java.StorageLevels; > import org.apache.spark.streaming.Durations; > import org.apache.spark.streaming.api.java.JavaDStream; > import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; > import org.apache.spark.streaming.api.java.JavaStreamingContext; > /** > * Use DataFrames and SQL to count words in UTF8 encoded, '\n' delimited text > received from the > * network every second. > * > * Usage: JavaSqlNetworkWordCount <hostname> <port> > * <hostname> and <port> describe the TCP server that Spark Streaming would > connect to receive data. > * > * To run this on your local machine, you need to first run a Netcat server > * `$ nc -lk 9999` > * and then run the example > * `$ bin/run-example > org.apache.spark.examples.streaming.JavaSqlNetworkWordCount localhost 9999` > */ > public final class JavaSqlNetworkWordCount { > private static final Pattern SPACE = Pattern.compile(" "); > public static void main(String[] args) throws Exception { > if (args.length < 2) { > System.err.println("Usage: JavaNetworkWordCount <hostname> <port>"); > System.exit(1); > } > StreamingExamples.setStreamingLogLevels(); > // Create the context with a 1 second batch size > SparkConf sparkConf = new SparkConf().setAppName("JavaSqlNetworkWordCount"); > JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, > Durations.seconds(1)); > // Create a JavaReceiverInputDStream on target ip:port and count the > // words in input stream of \n delimited text (eg. generated by 'nc') > // Note that no duplication in storage level only for running locally. > // Replication necessary in distributed scenario for fault tolerance. > JavaReceiverInputDStream<String> lines = ssc.socketTextStream( > args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER); > JavaDStream<String> words = lines.flatMap(x -> > Arrays.asList(SPACE.split(x)).iterator()); > // Convert RDDs of the words DStream to DataFrame and run SQL query > words.foreachRDD((rdd, time) -> { > SparkSession spark = > JavaSparkSessionSingleton.getInstance(rdd.context().getConf()); > // Convert JavaRDD[String] to JavaRDD[bean class] to DataFrame > JavaRDD<JavaRecord> rowRDD = rdd.map(word -> { > JavaRecord record = new JavaRecord(); > record.setWord(word); > return record; > }); > Dataset<Row> wordsDataFrame = spark.createDataFrame(rowRDD, JavaRecord.class); > // Creates a temporary view using the DataFrame > wordsDataFrame.createOrReplaceTempView("words"); > // Do word count on table using SQL and print it > Dataset<Row> wordCountsDataFrame = > spark.sql("select word, count(*) as total from words group by word"); > System.out.println("========= " + time + "========="); > wordCountsDataFrame.show(); > // access hive talbe failed > spark.sql("select * from test_fch"); > }); > ssc.start(); > ssc.awaitTermination(); > } > } > /** Lazily instantiated singleton instance of SparkSession */ > class JavaSparkSessionSingleton { > private static transient SparkSession instance = null; > public static SparkSession getInstance(SparkConf sparkConf) { > if (instance == null) { > instance = SparkSession > .builder() > .config(sparkConf) > .enableHiveSupport() > .getOrCreate(); > } > return instance; > } > } > {code} > Reporter: lhq > Priority: Major > > org.apache.spark.sql.AnalysisException: Table or view not found: test_fch; > line 1 pos 14 > at > org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:47) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveRelations$$lookupTableFromCatalog(Analyzer.scala:665) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.resolveRelation(Analyzer.scala:617) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$8.applyOrElse(Analyzer.scala:647) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$8.applyOrElse(Analyzer.scala:640) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289) > at > org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:640) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:586) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:87) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:84) > at > scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124) > at scala.collection.immutable.List.foldLeft(List.scala:84) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:84) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:76) > at scala.collection.immutable.List.foreach(List.scala:381) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:76) > at > org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:124) > at > org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:118) > at > org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:103) > at > org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57) > at > org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55) > at > org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47) > at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74) > at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:641) > at > org.apache.spark.examples.streaming.JavaSqlNetworkWordCount.lambda$main$3dd8454f$1(JavaSqlNetworkWordCount.java:89) > at > org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:280) > at > org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:280) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) > at > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) > at scala.util.Try$.apply(Try.scala:192) > at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.spark.sql.catalyst.analysis.NoSuchTableException: Table > or view 'test_fch' not found in database 'default'; > at > org.apache.spark.sql.catalyst.catalog.ExternalCatalog.requireTableExists(ExternalCatalog.scala:46) > at > org.apache.spark.sql.catalyst.catalog.InMemoryCatalog.getTable(InMemoryCatalog.scala:326) > at > org.apache.spark.sql.catalyst.catalog.SessionCatalog.lookupRelation(SessionCatalog.scala:669) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveRelations$$lookupTableFromCatalog(Analyzer.scala:662) > ... 51 more -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org