Thrift-Server JDBC ResultSet Cursor Reset or Previous
Dear List, Are there any future plans to implement cursor reset or previous record functionality in Thrift Server`s JDBC driver? Are there any other alternatives? java.sql.SQLException: Method not supported at org.apache.hive.jdbc.HiveBaseResultSet.previous(HiveBaseResultSet.java:643) regards Imran -- I.R
Re: Reading CSV with multiLine option invalidates encoding option.
Hi, Since the csv source currently supports ascii-compatible charset, so I guess shift-jis also works well. You could check Hyukjin's comment in https://issues.apache.org/jira/browse/SPARK-21289 for more info. On Wed, Aug 16, 2017 at 2:54 PM, Han-Cheol Cho wrote: > My apologies, > > It was a problem of our Hadoop cluster. > When we tested the same code on another cluster (HDP-based), it worked > without any problem. > > ```scala > ## make sjis text > cat a.txt > 8月データだけでやってみよう > nkf -W -s a.txt >b.txt > cat b.txt > 87n%G!<%?$@$1$G$d$C$F$_$h$& > nkf -s -w b.txt > 8月データだけでやってみよう > hdfs dfs -put a.txt b.txt > > ## YARN mode test > spark.read.option("encoding", "utf-8").csv("a.txt").show(1) > +--+ > | _c0| > +--+ > |8月データだけでやってみよう| > +--+ > > spark.read.option("encoding", "sjis").csv("b.txt").show(1) > +--+ > | _c0| > +--+ > |8月データだけでやってみよう| > +--+ > > spark.read.option("encoding", "utf-8").option("multiLine", > true).csv("a.txt").show(1) > +--+ > | _c0| > +--+ > |8月データだけでやってみよう| > +--+ > > spark.read.option("encoding", "sjis").option("multiLine", > true).csv("b.txt").show(1) > +--+ > | _c0| > +--+ > |8月データだけでやってみよう| > +--+ > ``` > > I am still digging the root cause and will share it later :-) > > Best wishes, > Han-Choel > > > On Wed, Aug 16, 2017 at 1:32 PM, Han-Cheol Cho wrote: > >> Dear Spark ML members, >> >> >> I experienced a trouble in using "multiLine" option to load CSV data with >> Shift-JIS encoding. >> When option("multiLine", true) is specified, option("encoding", >> "encoding-name") just doesn't work anymore. >> >> >> In CSVDataSource.scala file, I found that MultiLineCSVDataSource.readFile() >> method doesn't use parser.options.charset at all. >> >> object MultiLineCSVDataSource extends CSVDataSource { >> override val isSplitable: Boolean = false >> >> override def readFile( >> conf: Configuration, >> file: PartitionedFile, >> parser: UnivocityParser, >> schema: StructType): Iterator[InternalRow] = { >> UnivocityParser.parseStream( >> CodecStreams.createInputStreamWithCloseResource(conf, >> file.filePath), >> parser.options.headerFlag, >> parser, >> schema) >> } >> ... >> >> On the other hand, TextInputCSVDataSource.readFile() method uses it: >> >> override def readFile( >> conf: Configuration, >> file: PartitionedFile, >> parser: UnivocityParser, >> schema: StructType): Iterator[InternalRow] = { >> val lines = { >> val linesReader = new HadoopFileLinesReader(file, conf) >> Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => >> linesReader.close())) >> linesReader.map { line => >> new String(line.getBytes, 0, line.getLength, >> parser.options.charset)// < charset option is used here. >> } >> } >> >> val shouldDropHeader = parser.options.headerFlag && file.start == 0 >> UnivocityParser.parseIterator(lines, shouldDropHeader, parser, >> schema) >> } >> >> >> It seems like a bug. >> Is there anyone who had the same problem before? >> >> >> Best wishes, >> Han-Cheol >> >> -- >> == >> Han-Cheol Cho, Ph.D. >> Data scientist, Data Science Team, Data Laboratory >> NHN Techorus Corp. >> >> Homepage: https://sites.google.com/site/priancho/ >> == >> > > > > -- > == > Han-Cheol Cho, Ph.D. > Data scientist, Data Science Team, Data Laboratory > NHN Techorus Corp. > > Homepage: https://sites.google.com/site/priancho/ > == > -- --- Takeshi Yamamuro
Re: Reading CSV with multiLine option invalidates encoding option.
My apologies, It was a problem of our Hadoop cluster. When we tested the same code on another cluster (HDP-based), it worked without any problem. ```scala ## make sjis text cat a.txt 8月データだけでやってみよう nkf -W -s a.txt >b.txt cat b.txt 87n%G!<%?$@$1$G$d$C$F$_$h$& nkf -s -w b.txt 8月データだけでやってみよう hdfs dfs -put a.txt b.txt ## YARN mode test spark.read.option("encoding", "utf-8").csv("a.txt").show(1) +--+ | _c0| +--+ |8月データだけでやってみよう| +--+ spark.read.option("encoding", "sjis").csv("b.txt").show(1) +--+ | _c0| +--+ |8月データだけでやってみよう| +--+ spark.read.option("encoding", "utf-8").option("multiLine", true).csv("a.txt").show(1) +--+ | _c0| +--+ |8月データだけでやってみよう| +--+ spark.read.option("encoding", "sjis").option("multiLine", true).csv("b.txt").show(1) +--+ | _c0| +--+ |8月データだけでやってみよう| +--+ ``` I am still digging the root cause and will share it later :-) Best wishes, Han-Choel On Wed, Aug 16, 2017 at 1:32 PM, Han-Cheol Cho wrote: > Dear Spark ML members, > > > I experienced a trouble in using "multiLine" option to load CSV data with > Shift-JIS encoding. > When option("multiLine", true) is specified, option("encoding", > "encoding-name") just doesn't work anymore. > > > In CSVDataSource.scala file, I found that MultiLineCSVDataSource.readFile() > method doesn't use parser.options.charset at all. > > object MultiLineCSVDataSource extends CSVDataSource { > override val isSplitable: Boolean = false > > override def readFile( > conf: Configuration, > file: PartitionedFile, > parser: UnivocityParser, > schema: StructType): Iterator[InternalRow] = { > UnivocityParser.parseStream( > CodecStreams.createInputStreamWithCloseResource(conf, > file.filePath), > parser.options.headerFlag, > parser, > schema) > } > ... > > On the other hand, TextInputCSVDataSource.readFile() method uses it: > > override def readFile( > conf: Configuration, > file: PartitionedFile, > parser: UnivocityParser, > schema: StructType): Iterator[InternalRow] = { > val lines = { > val linesReader = new HadoopFileLinesReader(file, conf) > Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => > linesReader.close())) > linesReader.map { line => > new String(line.getBytes, 0, line.getLength, > parser.options.charset)// < charset option is used here. > } > } > > val shouldDropHeader = parser.options.headerFlag && file.start == 0 > UnivocityParser.parseIterator(lines, shouldDropHeader, parser, schema) > } > > > It seems like a bug. > Is there anyone who had the same problem before? > > > Best wishes, > Han-Cheol > > -- > == > Han-Cheol Cho, Ph.D. > Data scientist, Data Science Team, Data Laboratory > NHN Techorus Corp. > > Homepage: https://sites.google.com/site/priancho/ > == > -- == Han-Cheol Cho, Ph.D. Data scientist, Data Science Team, Data Laboratory NHN Techorus Corp. Homepage: https://sites.google.com/site/priancho/ ==
Reading CSV with multiLine option invalidates encoding option.
Dear Spark ML members, I experienced a trouble in using "multiLine" option to load CSV data with Shift-JIS encoding. When option("multiLine", true) is specified, option("encoding", "encoding-name") just doesn't work anymore. In CSVDataSource.scala file, I found that MultiLineCSVDataSource.readFile() method doesn't use parser.options.charset at all. object MultiLineCSVDataSource extends CSVDataSource { override val isSplitable: Boolean = false override def readFile( conf: Configuration, file: PartitionedFile, parser: UnivocityParser, schema: StructType): Iterator[InternalRow] = { UnivocityParser.parseStream( CodecStreams.createInputStreamWithCloseResource(conf, file.filePath), parser.options.headerFlag, parser, schema) } ... On the other hand, TextInputCSVDataSource.readFile() method uses it: override def readFile( conf: Configuration, file: PartitionedFile, parser: UnivocityParser, schema: StructType): Iterator[InternalRow] = { val lines = { val linesReader = new HadoopFileLinesReader(file, conf) Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => linesReader.close())) linesReader.map { line => new String(line.getBytes, 0, line.getLength, parser.options.charset)// < charset option is used here. } } val shouldDropHeader = parser.options.headerFlag && file.start == 0 UnivocityParser.parseIterator(lines, shouldDropHeader, parser, schema) } It seems like a bug. Is there anyone who had the same problem before? Best wishes, Han-Cheol -- == Han-Cheol Cho, Ph.D. Data scientist, Data Science Team, Data Laboratory NHN Techorus Corp. Homepage: https://sites.google.com/site/priancho/ ==
Re: SPIP: Spark on Kubernetes
>From our perspective, we have invested heavily in Kubernetes as our cluster manager of choice. We also make quite heavy use of spark. We've been experimenting with using these builds (2.1 with pyspark enabled) quite heavily. Given that we've already 'paid the price' to operate Kubernetes in AWS it seems rational to move our jobs over to spark on k8s. Having this project merged into the master will significantly ease keeping our Data Munging toolchain primarily on Spark. Gary Lucas Data Ops Team Lead Unbounce On 15 August 2017 at 15:52, Andrew Ash wrote: > +1 (non-binding) > > We're moving large amounts of infrastructure from a combination of open > source and homegrown cluster management systems to unify on Kubernetes and > want to bring Spark workloads along with us. > > On Tue, Aug 15, 2017 at 2:29 PM, liyinan926 wrote: > >> +1 (non-binding) >> >> >> >> -- >> View this message in context: http://apache-spark-developers >> -list.1001551.n3.nabble.com/SPIP-Spark-on-Kubernetes-tp22147p22164.html >> Sent from the Apache Spark Developers List mailing list archive at >> Nabble.com. >> >> - >> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org >> >> >
Re: Restart streaming query spark 2.1 structured streaming
Ok thanks Few more 1.when I looked into the documentation it says onQueryprogress is not threadsafe ,So Is this method would be the right place to refresh cache?and no need to restart query if I choose listener ? The methods are not thread-safe as they may be called from different threads. https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala 2.if I use streamingquerylistner onqueryprogress my understanding is method will be executed only when the query is in progress so if I refresh data frame here without restarting query will it impact application ? 3.should I use unpersist (Boolean) blocking method or async method unpersist() as the data size is big. I feel your solution is better as it stops query --> refresh cache --> starts query if I compromise on little downtime even cached dataframe is huge .I'm not sure how listener behaves as it's asynchronous, correct me if I'm wrong. On Tue, Aug 15, 2017 at 6:36 PM Tathagata Das wrote: > Both works. The asynchronous method with listener will have less of down > time, just that the first trigger/batch after the asynchronous > unpersist+persist will probably take longer as it has to reload the data. > > > On Tue, Aug 15, 2017 at 2:29 PM, purna pradeep > wrote: > >> Thanks tathagata das actually I'm planning to something like this >> >> activeQuery.stop() >> >> //unpersist and persist cached data frame >> >> df.unpersist() >> >> //read the updated data //data size of df is around 100gb >> >> df.persist() >> >> activeQuery = startQuery() >> >> >> the cached data frame size around 100gb ,so the question is this the >> right place to refresh this huge cached data frame ? >> >> I'm also trying to refresh cached data frame in onqueryprogress() method >> in a class which extends StreamingQuerylistner >> >> Would like to know which is the best place to refresh cached data frame >> and why >> >> Thanks again for the below response >> >> On Tue, Aug 15, 2017 at 4:45 PM Tathagata Das < >> tathagata.das1...@gmail.com> wrote: >> >>> You can do something like this. >>> >>> >>> def startQuery(): StreamingQuery = { >>>// create your streaming dataframes >>>// start the query with the same checkpoint directory} >>> >>> // handle to the active queryvar activeQuery: StreamingQuery = null >>> while(!stopped) { >>> >>>if (activeQuery = null) { // if query not active, start query >>> activeQuery = startQuery() >>> >>>} else if (shouldRestartQuery()) { // check your condition and >>> restart query >>> activeQuery.stop() >>> activeQuery = startQuery() >>>} >>> >>>activeQuery.awaitTermination(100) // wait for 100 ms. >>>// if there is any error it will throw exception and quit the loop >>>// otherwise it will keep checking the condition every 100ms} >>> >>> >>> >>> >>> On Tue, Aug 15, 2017 at 1:13 PM, purna pradeep >>> wrote: >>> Thanks Michael I guess my question is little confusing ..let me try again I would like to restart streaming query programmatically while my streaming application is running based on a condition and why I want to do this I want to refresh a cached data frame based on a condition and the best way to do this restart streaming query suggested by Tdas below for similar problem http://mail-archives.apache.org/mod_mbox/spark-user/201705.mbox/%3cCA+AHuKn+vSEWkJD=bsst6g5bdzdas6wmn+fwmn4jtm1x1nd...@mail.gmail.com%3e I do understand that checkpoint if helps in recovery and failures but I would like to know "how to restart streaming query programmatically without stopping my streaming application" In place of query.awaittermination should I need to have an logic to restart query? Please suggest On Tue, Aug 15, 2017 at 3:26 PM Michael Armbrust < mich...@databricks.com> wrote: > See > https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing > > Though I think that this currently doesn't work with the console sink. > > On Tue, Aug 15, 2017 at 9:40 AM, purna pradeep < > purna2prad...@gmail.com> wrote: > >> Hi, >> >>> >>> I'm trying to restart a streaming query to refresh cached data frame >>> >>> Where and how should I restart streaming query >>> >> >> >> val sparkSes = SparkSession >> >> .builder >> >> .config("spark.master", "local") >> >> .appName("StreamingCahcePoc") >> >> .getOrCreate() >> >> >> >> import sparkSes.implicits._ >> >> >> >> val dataDF = sparkSes.readStream >> >> .schema(streamSchema) >> >> .csv("testData") >> >> >> >> >> >>val query = counts.writeStream >> >> .outputMo
Re: Restart streaming query spark 2.1 structured streaming
Both works. The asynchronous method with listener will have less of down time, just that the first trigger/batch after the asynchronous unpersist+persist will probably take longer as it has to reload the data. On Tue, Aug 15, 2017 at 2:29 PM, purna pradeep wrote: > Thanks tathagata das actually I'm planning to something like this > > activeQuery.stop() > > //unpersist and persist cached data frame > > df.unpersist() > > //read the updated data //data size of df is around 100gb > > df.persist() > > activeQuery = startQuery() > > > the cached data frame size around 100gb ,so the question is this the right > place to refresh this huge cached data frame ? > > I'm also trying to refresh cached data frame in onqueryprogress() method > in a class which extends StreamingQuerylistner > > Would like to know which is the best place to refresh cached data frame > and why > > Thanks again for the below response > > On Tue, Aug 15, 2017 at 4:45 PM Tathagata Das > wrote: > >> You can do something like this. >> >> >> def startQuery(): StreamingQuery = { >>// create your streaming dataframes >>// start the query with the same checkpoint directory} >> >> // handle to the active queryvar activeQuery: StreamingQuery = null >> while(!stopped) { >> >>if (activeQuery = null) { // if query not active, start query >> activeQuery = startQuery() >> >>} else if (shouldRestartQuery()) { // check your condition and >> restart query >> activeQuery.stop() >> activeQuery = startQuery() >>} >> >>activeQuery.awaitTermination(100) // wait for 100 ms. >>// if there is any error it will throw exception and quit the loop >>// otherwise it will keep checking the condition every 100ms} >> >> >> >> >> On Tue, Aug 15, 2017 at 1:13 PM, purna pradeep >> wrote: >> >>> Thanks Michael >>> >>> I guess my question is little confusing ..let me try again >>> >>> >>> I would like to restart streaming query programmatically while my >>> streaming application is running based on a condition and why I want to do >>> this >>> >>> I want to refresh a cached data frame based on a condition and the best >>> way to do this restart streaming query suggested by Tdas below for similar >>> problem >>> >>> http://mail-archives.apache.org/mod_mbox/spark-user/ >>> 201705.mbox/%3cCA+AHuKn+vSEWkJD=bSSt6G5bDZDaS6wmN+ >>> fwmn4jtm1x1nd...@mail.gmail.com%3e >>> >>> I do understand that checkpoint if helps in recovery and failures but I >>> would like to know "how to restart streaming query programmatically without >>> stopping my streaming application" >>> >>> In place of query.awaittermination should I need to have an logic to >>> restart query? Please suggest >>> >>> >>> On Tue, Aug 15, 2017 at 3:26 PM Michael Armbrust >>> wrote: >>> See https://spark.apache.org/docs/latest/structured- streaming-programming-guide.html#recovering-from-failures- with-checkpointing Though I think that this currently doesn't work with the console sink. On Tue, Aug 15, 2017 at 9:40 AM, purna pradeep >>> > wrote: > Hi, > >> >> I'm trying to restart a streaming query to refresh cached data frame >> >> Where and how should I restart streaming query >> > > > val sparkSes = SparkSession > > .builder > > .config("spark.master", "local") > > .appName("StreamingCahcePoc") > > .getOrCreate() > > > > import sparkSes.implicits._ > > > > val dataDF = sparkSes.readStream > > .schema(streamSchema) > > .csv("testData") > > > > > >val query = counts.writeStream > > .outputMode("complete") > > .format("console") > > .start() > > > query.awaittermination() > > > >> >> >> >>
Re: Restart streaming query spark 2.1 structured streaming
Thanks tathagata das actually I'm planning to something like this activeQuery.stop() //unpersist and persist cached data frame df.unpersist() //read the updated data //data size of df is around 100gb df.persist() activeQuery = startQuery() the cached data frame size around 100gb ,so the question is this the right place to refresh this huge cached data frame ? I'm also trying to refresh cached data frame in onqueryprogress() method in a class which extends StreamingQuerylistner Would like to know which is the best place to refresh cached data frame and why Thanks again for the below response On Tue, Aug 15, 2017 at 4:45 PM Tathagata Das wrote: > You can do something like this. > > > def startQuery(): StreamingQuery = { >// create your streaming dataframes >// start the query with the same checkpoint directory} > > // handle to the active queryvar activeQuery: StreamingQuery = null > while(!stopped) { > >if (activeQuery = null) { // if query not active, start query > activeQuery = startQuery() > >} else if (shouldRestartQuery()) { // check your condition and > restart query > activeQuery.stop() > activeQuery = startQuery() >} > >activeQuery.awaitTermination(100) // wait for 100 ms. >// if there is any error it will throw exception and quit the loop >// otherwise it will keep checking the condition every 100ms} > > > > > On Tue, Aug 15, 2017 at 1:13 PM, purna pradeep > wrote: > >> Thanks Michael >> >> I guess my question is little confusing ..let me try again >> >> >> I would like to restart streaming query programmatically while my >> streaming application is running based on a condition and why I want to do >> this >> >> I want to refresh a cached data frame based on a condition and the best >> way to do this restart streaming query suggested by Tdas below for similar >> problem >> >> >> http://mail-archives.apache.org/mod_mbox/spark-user/201705.mbox/%3cCA+AHuKn+vSEWkJD=bsst6g5bdzdas6wmn+fwmn4jtm1x1nd...@mail.gmail.com%3e >> >> I do understand that checkpoint if helps in recovery and failures but I >> would like to know "how to restart streaming query programmatically without >> stopping my streaming application" >> >> In place of query.awaittermination should I need to have an logic to >> restart query? Please suggest >> >> >> On Tue, Aug 15, 2017 at 3:26 PM Michael Armbrust >> wrote: >> >>> See >>> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing >>> >>> Though I think that this currently doesn't work with the console sink. >>> >>> On Tue, Aug 15, 2017 at 9:40 AM, purna pradeep >>> wrote: >>> Hi, > > I'm trying to restart a streaming query to refresh cached data frame > > Where and how should I restart streaming query > val sparkSes = SparkSession .builder .config("spark.master", "local") .appName("StreamingCahcePoc") .getOrCreate() import sparkSes.implicits._ val dataDF = sparkSes.readStream .schema(streamSchema) .csv("testData") val query = counts.writeStream .outputMode("complete") .format("console") .start() query.awaittermination() > > > >>> >
Hive Metastore open connections even after closing Spark context and session
Hi. I am using Spark for querying Hive followed by transformations. My Scala app creates multiple Spark Applications. A new spark context (and session) is created only after closing previous SparkSession and Spark Context. However, on stopping sc and spark, somehow connections to Hive Metastore (Mysql) are not destroyed properly. For every, Spark App I can see around 5 Mysql connections being created (old connections being still active!). Eventually, Mysql starts rejecting new connections after 150 open connections. How can I force spark to close Hive metastore connections to Mysql (after spark.stop() and sc.stop())? sc = spark context spark = sparksession Regards, Rohit S Damkondwar
Re: Restart streaming query spark 2.1 structured streaming
You can do something like this. def startQuery(): StreamingQuery = { // create your streaming dataframes // start the query with the same checkpoint directory} // handle to the active queryvar activeQuery: StreamingQuery = null while(!stopped) { if (activeQuery = null) { // if query not active, start query activeQuery = startQuery() } else if (shouldRestartQuery()) { // check your condition and restart query activeQuery.stop() activeQuery = startQuery() } activeQuery.awaitTermination(100) // wait for 100 ms. // if there is any error it will throw exception and quit the loop // otherwise it will keep checking the condition every 100ms} On Tue, Aug 15, 2017 at 1:13 PM, purna pradeep wrote: > Thanks Michael > > I guess my question is little confusing ..let me try again > > > I would like to restart streaming query programmatically while my > streaming application is running based on a condition and why I want to do > this > > I want to refresh a cached data frame based on a condition and the best > way to do this restart streaming query suggested by Tdas below for similar > problem > > http://mail-archives.apache.org/mod_mbox/spark-user/ > 201705.mbox/%3cCA+AHuKn+vSEWkJD=bSSt6G5bDZDaS6wmN+ > fwmn4jtm1x1nd...@mail.gmail.com%3e > > I do understand that checkpoint if helps in recovery and failures but I > would like to know "how to restart streaming query programmatically without > stopping my streaming application" > > In place of query.awaittermination should I need to have an logic to > restart query? Please suggest > > > On Tue, Aug 15, 2017 at 3:26 PM Michael Armbrust > wrote: > >> See https://spark.apache.org/docs/latest/structured- >> streaming-programming-guide.html#recovering-from-failures- >> with-checkpointing >> >> Though I think that this currently doesn't work with the console sink. >> >> On Tue, Aug 15, 2017 at 9:40 AM, purna pradeep >> wrote: >> >>> Hi, >>> I'm trying to restart a streaming query to refresh cached data frame Where and how should I restart streaming query >>> >>> >>> val sparkSes = SparkSession >>> >>> .builder >>> >>> .config("spark.master", "local") >>> >>> .appName("StreamingCahcePoc") >>> >>> .getOrCreate() >>> >>> >>> >>> import sparkSes.implicits._ >>> >>> >>> >>> val dataDF = sparkSes.readStream >>> >>> .schema(streamSchema) >>> >>> .csv("testData") >>> >>> >>> >>> >>> >>>val query = counts.writeStream >>> >>> .outputMode("complete") >>> >>> .format("console") >>> >>> .start() >>> >>> >>> query.awaittermination() >>> >>> >>> >>
Re: Restart streaming query spark 2.1 structured streaming
Thanks Michael I guess my question is little confusing ..let me try again I would like to restart streaming query programmatically while my streaming application is running based on a condition and why I want to do this I want to refresh a cached data frame based on a condition and the best way to do this restart streaming query suggested by Tdas below for similar problem http://mail-archives.apache.org/mod_mbox/spark-user/201705.mbox/%3cCA+AHuKn+vSEWkJD=bsst6g5bdzdas6wmn+fwmn4jtm1x1nd...@mail.gmail.com%3e I do understand that checkpoint if helps in recovery and failures but I would like to know "how to restart streaming query programmatically without stopping my streaming application" In place of query.awaittermination should I need to have an logic to restart query? Please suggest On Tue, Aug 15, 2017 at 3:26 PM Michael Armbrust wrote: > See > https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing > > Though I think that this currently doesn't work with the console sink. > > On Tue, Aug 15, 2017 at 9:40 AM, purna pradeep > wrote: > >> Hi, >> >>> >>> I'm trying to restart a streaming query to refresh cached data frame >>> >>> Where and how should I restart streaming query >>> >> >> >> val sparkSes = SparkSession >> >> .builder >> >> .config("spark.master", "local") >> >> .appName("StreamingCahcePoc") >> >> .getOrCreate() >> >> >> >> import sparkSes.implicits._ >> >> >> >> val dataDF = sparkSes.readStream >> >> .schema(streamSchema) >> >> .csv("testData") >> >> >> >> >> >>val query = counts.writeStream >> >> .outputMode("complete") >> >> .format("console") >> >> .start() >> >> >> query.awaittermination() >> >> >> >>> >>> >>> >
Re: Restart streaming query spark 2.1 structured streaming
See https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing Though I think that this currently doesn't work with the console sink. On Tue, Aug 15, 2017 at 9:40 AM, purna pradeep wrote: > Hi, > >> >> I'm trying to restart a streaming query to refresh cached data frame >> >> Where and how should I restart streaming query >> > > > val sparkSes = SparkSession > > .builder > > .config("spark.master", "local") > > .appName("StreamingCahcePoc") > > .getOrCreate() > > > > import sparkSes.implicits._ > > > > val dataDF = sparkSes.readStream > > .schema(streamSchema) > > .csv("testData") > > > > > >val query = counts.writeStream > > .outputMode("complete") > > .format("console") > > .start() > > > query.awaittermination() > > > >> >> >>
Re: Spark 2.2 streaming with append mode: empty output
The input dataset has multiple days worth of data, so I thought the watermark should have been crossed. To debug, I changed the query to the code below. My expectation was that since I am doing 1 day windows with late arrivals permitted for 1 second, when it sees records for the next day, it would output a row for the previous day. When i run the code below with 'complete' output mode, I see the table and then the lastProgress output about 10 seconds later. When i run it with 'append' mode, the table has no rows, but the lastProgress output is the same. Is withWatermark ignored in 'complete' and 'update' modes? For append mode, I'm not able to understand why the watermark entry is still at 1970. "For a specific window starting at time T, the engine will maintain state and allow late data to update the state until (max event time seen by the engine - late threshold > T)". The table shows that we are seeing events with timestamp 2017-05-18. At that point the 2017-05-17 window can be closed and the row output, right? grouped_df = frame \ .withWatermark("timestamp", "1 second") \ .groupby(F.window("timestamp", "1 day")) \ .agg(F.max("timestamp")) query = frame.writeStream \ .format("console") \ .option("truncate", False) \ .option("checkpointLocation", CKPT_LOC) \ .outputMode("complete") \ .start() import time i = 0 while (i < 10): time.sleep(10) print(query.lastProgress) i += 1 query.awaitTermination() The output: +-+---+ |window |max(timestamp) | +-+---+ |[2017-05-17 00:00:00.0,2017-05-18 00:00:00.0]|2017-05-17 23:59:59| |[2017-05-15 00:00:00.0,2017-05-16 00:00:00.0]|2017-05-15 23:59:59| |[2017-05-14 00:00:00.0,2017-05-15 00:00:00.0]|2017-05-14 23:59:59| |[2017-05-16 00:00:00.0,2017-05-17 00:00:00.0]|2017-05-16 23:59:59| |[2017-05-07 00:00:00.0,2017-05-08 00:00:00.0]|2017-05-07 23:59:59| |[2017-05-19 00:00:00.0,2017-05-20 00:00:00.0]|2017-05-19 23:59:59| |[2017-05-18 00:00:00.0,2017-05-19 00:00:00.0]|2017-05-18 23:59:59| |[2017-05-20 00:00:00.0,2017-05-21 00:00:00.0]|2017-05-20 23:59:59| |[2017-05-08 00:00:00.0,2017-05-09 00:00:00.0]|2017-05-08 23:59:59| |[2017-05-10 00:00:00.0,2017-05-11 00:00:00.0]|2017-05-10 23:59:59| |[2017-05-13 00:00:00.0,2017-05-14 00:00:00.0]|2017-05-13 23:59:57| |[2017-05-21 00:00:00.0,2017-05-22 00:00:00.0]|2017-05-21 23:40:08| |[2017-05-09 00:00:00.0,2017-05-10 00:00:00.0]|2017-05-09 23:59:59| |[2017-05-12 00:00:00.0,2017-05-13 00:00:00.0]|2017-05-12 23:40:11| |[2017-05-11 00:00:00.0,2017-05-12 00:00:00.0]|2017-05-11 23:59:59| +-+---+{u'stateOperators': [{u'numRowsTotal': 15, u'numRowsUpdated': 0}], u'eventTime': {u'watermark': u'1970-01-01T00:00:00.000Z'}, u'name': None, u'timestamp': u'2017-08-15T18:13:54.381Z', u'processedRowsPerSecond': 0.0, u'inputRowsPerSecond': 0.0, u'numInputRows': 0, u'sources': [{u'description': u'FileStreamSource[hdfs://some_ip/some_path]', u'endOffset': {u'logOffset': 0}, u'processedRowsPerSecond': 0.0, u'inputRowsPerSecond': 0.0, u'numInputRows': 0, u'startOffset': {u'logOffset': 0}}], u'durationMs': {u'getOffset': 57, u'triggerExecution': 57}, u'runId': u'a5b75404-c774-49db-aac5-2592211417ca', u'id': u'35ad86ec-f608-40b5-a48b-9507c82a87c8', u'sink': {u'description': u'org.apache.spark.sql.execution.streaming.ConsoleSink@7e4050cd'}} On Mon, Aug 14, 2017 at 4:55 PM, Tathagata Das wrote: > In append mode, the aggregation outputs a row only when the watermark has > been crossed and the corresponding aggregate is *final*, that is, will not > be updated any more. > See http://spark.apache.org/docs/latest/structured- > streaming-programming-guide.html#handling-late-data-and-watermarking > > On Mon, Aug 14, 2017 at 4:09 PM, Ashwin Raju wrote: > >> Hi, >> >> I am running Spark 2.2 and trying out structured streaming. I have the >> following code: >> >> from pyspark.sql import functions as F >> >> df=frame \ >> >> .withWatermark("timestamp","1 minute") \ >> >> .groupby(F.window("timestamp","1 day"),*groupby_cols) \ >> >> .agg(f.sum('bytes')) >> >> query = frame.writeStream \ >> >> .format("console") >> >> .option("checkpointLocation", '\some\chkpoint') >> >> .outputMode("complete") >> >> .start() >> >> >> >> query.awaitTermination() >> >> >> >> It prints out a bunch of aggregated rows to console. When I run the same >> query with outputMode("append") however, the output only has the column >> names, no rows. I was originally trying to output to parquet, which only >> supports append mode. I was seeing no data in my parquet files, so I >> switched to console output to debug, then noticed this issue. Am I >> misunderstanding something about how append mode works? >> >> >> Thanks, >> >> Ashwin >> >> >
How to force Spark Kafka Direct to start from the latest offset when the lag is huge in kafka 10?
Hi, How to force Spark Kafka Direct to start from the latest offset when the lag is huge in kafka 10? It seems to be processing from the latest offset stored for a group id. One way to do this is to change the group id. But it would mean that each time that we need to process the job from the latest offset we have to provide a new group id. Is there a way to force the job to run from the latest offset in case we need to and still use the same group id? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-force-Spark-Kafka-Direct-to-start-from-the-latest-offset-when-the-lag-is-huge-in-kafka-10-tp29071.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Restart streaming query spark 2.1 structured streaming
Hi, > > I'm trying to restart a streaming query to refresh cached data frame > > Where and how should I restart streaming query > val sparkSes = SparkSession .builder .config("spark.master", "local") .appName("StreamingCahcePoc") .getOrCreate() import sparkSes.implicits._ val dataDF = sparkSes.readStream .schema(streamSchema) .csv("testData") val query = counts.writeStream .outputMode("complete") .format("console") .start() query.awaittermination() > > >
Re: DAGScheduler - two runtimes
ResultStage cost time is your job's last stage cost time. Job 13 finished: reduce at VertexRDDImpl.scala:90, took 0.035546 s is the time your job cost 2017-08-14 18:58 GMT+08:00 Kaepke, Marc : > Hi everyone, > > I’m a Spark newbie and have one question: > What is the difference between the duration measures in my log/ console > output? > > 17/08/14 12:48:58 INFO DAGScheduler: ResultStage 232 (reduce at > VertexRDDImpl.scala:90) finished in 0.026 s > 17/08/14 12:48:58 INFO DAGScheduler: Job 13 finished: reduce at > VertexRDDImpl.scala:90, took 0.035546 s > > I need the total runtime of the job in ms. Or seconds with decimal > > Thanks! > > Best > Marc > > >
How to calculating CPU time for a Spark Job?
How to calculating CPU time for a Spark Job? Is there any interface can be directly call? like the hadoop Map-Reduce Framework provider the CPU time spent(ms) in the Counters. thinks!