[ https://issues.apache.org/jira/browse/SPARK-24592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hyukjin Kwon resolved SPARK-24592. ---------------------------------- Resolution: Duplicate Looks a duplicate of SPARK-24593. > can not find hive table after spark streaming start > --------------------------------------------------- > > Key: SPARK-24592 > URL: https://issues.apache.org/jira/browse/SPARK-24592 > Project: Spark > Issue Type: Bug > Components: DStreams, SQL > Affects Versions: 2.3.0, 2.3.1 > Environment: > {code:java} > /* > * Licensed to the Apache Software Foundation (ASF) under one or more > * contributor license agreements. See the NOTICE file distributed with > * this work for additional information regarding copyright ownership. > * The ASF licenses this file to You under the Apache License, Version 2.0 > * (the "License"); you may not use this file except in compliance with > * the License. You may obtain a copy of the License at > * > * http://www.apache.org/licenses/LICENSE-2.0 > * > * Unless required by applicable law or agreed to in writing, software > * distributed under the License is distributed on an "AS IS" BASIS, > * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > * See the License for the specific language governing permissions and > * limitations under the License. > */ > package org.apache.spark.examples.streaming; > import java.util.Arrays; > import java.util.regex.Pattern; > import org.apache.spark.SparkConf; > import org.apache.spark.api.java.JavaRDD; > import org.apache.spark.sql.Dataset; > import org.apache.spark.sql.Row; > import org.apache.spark.sql.SparkSession; > import org.apache.spark.api.java.StorageLevels; > import org.apache.spark.streaming.Durations; > import org.apache.spark.streaming.api.java.JavaDStream; > import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; > import org.apache.spark.streaming.api.java.JavaStreamingContext; > /** > * Use DataFrames and SQL to count words in UTF8 encoded, '\n' delimited text > received from the > * network every second. > * > * Usage: JavaSqlNetworkWordCount <hostname> <port> > * <hostname> and <port> describe the TCP server that Spark Streaming would > connect to receive data. > * > * To run this on your local machine, you need to first run a Netcat server > * `$ nc -lk 9999` > * and then run the example > * `$ bin/run-example > org.apache.spark.examples.streaming.JavaSqlNetworkWordCount localhost 9999` > */ > public final class JavaSqlNetworkWordCount { > private static final Pattern SPACE = Pattern.compile(" "); > public static void main(String[] args) throws Exception { > if (args.length < 2) { > System.err.println("Usage: JavaNetworkWordCount <hostname> <port>"); > System.exit(1); > } > StreamingExamples.setStreamingLogLevels(); > // Create the context with a 1 second batch size > SparkConf sparkConf = new SparkConf().setAppName("JavaSqlNetworkWordCount"); > JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, > Durations.seconds(1)); > // Create a JavaReceiverInputDStream on target ip:port and count the > // words in input stream of \n delimited text (eg. generated by 'nc') > // Note that no duplication in storage level only for running locally. > // Replication necessary in distributed scenario for fault tolerance. > JavaReceiverInputDStream<String> lines = ssc.socketTextStream( > args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER); > JavaDStream<String> words = lines.flatMap(x -> > Arrays.asList(SPACE.split(x)).iterator()); > // Convert RDDs of the words DStream to DataFrame and run SQL query > words.foreachRDD((rdd, time) -> { > SparkSession spark = > JavaSparkSessionSingleton.getInstance(rdd.context().getConf()); > // Convert JavaRDD[String] to JavaRDD[bean class] to DataFrame > JavaRDD<JavaRecord> rowRDD = rdd.map(word -> { > JavaRecord record = new JavaRecord(); > record.setWord(word); > return record; > }); > Dataset<Row> wordsDataFrame = spark.createDataFrame(rowRDD, JavaRecord.class); > // Creates a temporary view using the DataFrame > wordsDataFrame.createOrReplaceTempView("words"); > // Do word count on table using SQL and print it > Dataset<Row> wordCountsDataFrame = > spark.sql("select word, count(*) as total from words group by word"); > System.out.println("========= " + time + "========="); > wordCountsDataFrame.show(); > spark.sql("select * from test_fch"); > }); > ssc.start(); > ssc.awaitTermination(); > } > } > /** Lazily instantiated singleton instance of SparkSession */ > class JavaSparkSessionSingleton { > private static transient SparkSession instance = null; > public static SparkSession getInstance(SparkConf sparkConf) { > if (instance == null) { > instance = SparkSession > .builder() > .config(sparkConf) > .enableHiveSupport() > .getOrCreate(); > } > return instance; > } > } > {code} > > Reporter: lhq > Priority: Major > > org.apache.spark.sql.AnalysisException: Table or view not found: test_fch; > line 1 pos 14 > at > org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:47) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveRelations$$lookupTableFromCatalog(Analyzer.scala:665) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.resolveRelation(Analyzer.scala:617) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$8.applyOrElse(Analyzer.scala:647) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$8.applyOrElse(Analyzer.scala:640) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289) > at > org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:640) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:586) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:87) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:84) > at > scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124) > at scala.collection.immutable.List.foldLeft(List.scala:84) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:84) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:76) > at scala.collection.immutable.List.foreach(List.scala:381) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:76) > at > org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:124) > at > org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:118) > at > org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:103) > at > org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57) > at > org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55) > at > org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47) > at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74) > at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:641) > at > org.apache.spark.examples.streaming.JavaSqlNetworkWordCount.lambda$main$3dd8454f$1(JavaSqlNetworkWordCount.java:89) > at > org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:280) > at > org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:280) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) > at > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) > at scala.util.Try$.apply(Try.scala:192) > at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.spark.sql.catalyst.analysis.NoSuchTableException: Table > or view 'test_fch' not found in database 'default'; > at > org.apache.spark.sql.catalyst.catalog.ExternalCatalog.requireTableExists(ExternalCatalog.scala:46) > at > org.apache.spark.sql.catalyst.catalog.InMemoryCatalog.getTable(InMemoryCatalog.scala:326) > at > org.apache.spark.sql.catalyst.catalog.SessionCatalog.lookupRelation(SessionCatalog.scala:669) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveRelations$$lookupTableFromCatalog(Analyzer.scala:662) > ... 51 more -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org