An exception makes different phenomnon

2018-04-16 Thread big data
Hi all,

we have two environments for spark streaming job, which consumes Kafka 
topic to do calculation.

Now in one environment, spark streaming job consume an non-standard data 
from kafka and throw an excepiton(not catch it in code), then the 
sreaming job is down.

But in another environment, spark streaming job also throw an exception( 
same exception message in log file), but the streaming job is still 
running and consume other data continuously.

Is there some parameters or configuration for this problem? Why one job 
is down and another job is still running.



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: can we use mapGroupsWithState in raw sql?

2018-04-16 Thread Tathagata Das
Unfortunately no. Honestly it does not make sense as for type-aware
operations like map, mapGroups, etc., you have to provide an actual JVM
function. That does not fit in with the SQL language structure.

On Mon, Apr 16, 2018 at 7:34 PM, kant kodali  wrote:

> Hi All,
>
> can we use mapGroupsWithState in raw SQL? or is it in the roadmap?
>
> Thanks!
>
>
>


unsubscribe

2018-04-16 Thread 韩盼
unsubscribe


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



pyspark execution

2018-04-16 Thread anudeep
Hi All,

I have a python file which I am executing directly with spark-submit
command.

Inside the python file, I have sql written using hive context.I created a
generic variable for the  database name inside sql

The problem is : How can I pass the value for this variable dynamically
just as we give in hive like --hivevar parameter.

Thanks!
Anudeep


can we use mapGroupsWithState in raw sql?

2018-04-16 Thread kant kodali
Hi All,

can we use mapGroupsWithState in raw SQL? or is it in the roadmap?

Thanks!


Re: Warning from user@spark.apache.org

2018-04-16 Thread Prasad Velagaleti
Hello,
   I got a message saying , messages sent to me (my gmail id) from the
mailing list got bounced  ?
Wonder why ?

thanks,
Prasad.

On Mon, Apr 16, 2018 at 6:16 PM,  wrote:

> Hi! This is the ezmlm program. I'm managing the
> user@spark.apache.org mailing list.
>
>
> Messages to you from the user mailing list seem to
> have been bouncing. I've attached a copy of the first bounce
> message I received.
>
> If this message bounces too, I will send you a probe. If the probe bounces,
> I will remove your address from the user mailing list,
> without further notice.
>
>
> I've kept a list of which messages from the user mailing list have
> bounced from your address.
>
> Copies of these messages may be in the archive.
> To retrieve a set of messages 123-145 (a maximum of 100 per request),
> send a short message to:
>
>
> To receive a subject and author list for the last 100 or so messages,
> send a short message to:
>
>
> Here are the message numbers:
>
>74336
>
> --- Enclosed is a copy of the bounce message I received.
>
> Return-Path: <>
> Received: (qmail 55901 invoked for bounce); 6 Apr 2018 23:03:41 -
> Date: 6 Apr 2018 23:03:41 -
> From: mailer-dae...@apache.org
> To: user-return-743...@spark.apache.org
> Subject: failure notice
>
>


[Spark 2.x Core] Job writing out an extra empty part-0000* file

2018-04-16 Thread klrmowse
the spark job succeeds (and with correct output), except there is always an
extra part-* file, and it is empty...

i even set number of partitions to only 2 via spark-submit, but there is
still a 3rd, empty, part-file that shows up.

why does it do that? how to fix?



Thank you



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Driver aborts on Mesos when unable to connect to one of external shuffle services

2018-04-16 Thread igor.berman
Hi Szuromi,
We manage external shuffle service by Marathon and not manually
sometime though, eg. when adding new node to cluster there is some delay
between mesos schedules tasks on some slave and marathon scheduling external
shuffle service task on this node.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Structured Streaming] More than 1 streaming in a code

2018-04-16 Thread Aakash Basu
Hi Gerard,

"If your actual source is Kafka, the original solution of using
`spark.streams.awaitAnyTermination`  should solve the problem."

I tried literally everything, nothing worked out.

1) Tried NC from two different ports for two diff streams, still nothing
worked.

2) Tried same using Kafka with awaitAnyTermination, still no use, the first
stream write kept on blocking the second... (And inner queries with
aggregation doesn't work in Spark Streaming it seems, as it expects a
separate writeStream.start()).

Any insight (or direct update to the code would be helpful).

Thanks,
Aakash.

On Mon 16 Apr, 2018, 9:05 PM Gerard Maas,  wrote:

> Aakash,
>
> There are two issues here.
> The issue with the code on the first question is that the first query
> blocks and the code for the second does not get executed. Panagiotis
> pointed this out correctly.
> In the updated code, the issue is related to netcat (nc) and the way
> structured streaming works. As far as I remember, netcat only delivers data
> to the first network connection.
> On the structured streaming side, each query will issue its own
> connection. This results in only the first query getting the data.
> If you would talk to a TPC server supporting multiple connected clients,
> you would see data in both queries.
>
> If your actual source is Kafka, the original solution of using
> `spark.streams.awaitAnyTermination`  should solve the problem.
>
> -kr, Gerard.
>
>
>
> On Mon, Apr 16, 2018 at 10:52 AM, Aakash Basu 
> wrote:
>
>> Hey Jayesh and Others,
>>
>> Is there then, any other way to come to a solution for this use-case?
>>
>> Thanks,
>> Aakash.
>>
>> On Mon, Apr 16, 2018 at 8:11 AM, Lalwani, Jayesh <
>> jayesh.lalw...@capitalone.com> wrote:
>>
>>> Note that what you are trying to do here is join a streaming data frame
>>> with an aggregated streaming data frame. As per the documentation, joining
>>> an aggregated streaming data frame with another streaming data frame is not
>>> supported
>>>
>>>
>>>
>>>
>>>
>>> *From: *spark receiver 
>>> *Date: *Friday, April 13, 2018 at 11:49 PM
>>> *To: *Aakash Basu 
>>> *Cc: *Panagiotis Garefalakis , user <
>>> user@spark.apache.org>
>>> *Subject: *Re: [Structured Streaming] More than 1 streaming in a code
>>>
>>>
>>>
>>> Hi Panagiotis ,
>>>
>>>
>>>
>>> Wondering you solved the problem or not? Coz I met the same issue today.
>>> I’d appreciate  so much if you could paste the code snippet  if it’s
>>> working .
>>>
>>>
>>>
>>> Thanks.
>>>
>>>
>>>
>>>
>>>
>>> 在 2018年4月6日,上午7:40,Aakash Basu  写道:
>>>
>>>
>>>
>>> Hi Panagiotis,
>>>
>>> I did that, but it still prints the result of the first query and awaits
>>> for new data, doesn't even goes to the next one.
>>>
>>> *Data -*
>>>
>>> $ nc -lk 9998
>>>
>>> 1,2
>>> 3,4
>>> 5,6
>>> 7,8
>>>
>>> *Result -*
>>>
>>> ---
>>> Batch: 0
>>> ---
>>> ++
>>> |aver|
>>> ++
>>> | 3.0|
>>> ++
>>>
>>> ---
>>> Batch: 1
>>> ---
>>> ++
>>> |aver|
>>> ++
>>> | 4.0|
>>> ++
>>>
>>>
>>>
>>> *Updated Code -*
>>>
>>> from pyspark.sql import SparkSession
>>> from pyspark.sql.functions import split
>>>
>>> spark = SparkSession \
>>> .builder \
>>> .appName("StructuredNetworkWordCount") \
>>> .getOrCreate()
>>>
>>> data = spark \
>>> .readStream \
>>> .format("socket") \
>>> .option("header","true") \
>>> .option("host", "localhost") \
>>> .option("port", 9998) \
>>> .load("csv")
>>>
>>>
>>> id_DF = data.select(split(data.value, ",").getItem(0).alias("col1"), 
>>> split(data.value, ",").getItem(1).alias("col2"))
>>>
>>> id_DF.createOrReplaceTempView("ds")
>>>
>>> df = spark.sql("select avg(col1) as aver from ds")
>>>
>>> df.createOrReplaceTempView("abcd")
>>>
>>> wordCounts = spark.sql("Select col1, col2, col2/(select aver from abcd) 
>>> col3 from ds")  # (select aver from abcd)
>>>
>>> query2 = df \
>>> .writeStream \
>>> .format("console") \
>>> .outputMode("complete") \
>>> .trigger(processingTime='5 seconds') \
>>> .start()
>>>
>>> query = wordCounts \
>>> .writeStream \
>>> .format("console") \
>>> .trigger(processingTime='5 seconds') \
>>> .start()
>>>
>>> spark.streams.awaitAnyTermination()
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Aakash.
>>>
>>>
>>>
>>> On Fri, Apr 6, 2018 at 4:18 PM, Panagiotis Garefalakis <
>>> panga...@gmail.com> wrote:
>>>
>>> Hello Aakash,
>>>
>>>
>>>
>>> When you use query.awaitTermination you are pretty much blocking there
>>> waiting for the current query to stop or throw an exception. In your case
>>> the second query will not even start.
>>>
>>> What you could do instead is remove all the blocking calls and use
>>> spark.streams.awaitAnyTermination instead 

Re: PySpark ML: Get best set of parameters from TrainValidationSplit

2018-04-16 Thread Bryan Cutler
Hi Aakash,

First you will want to get the the random forest model stage from the best
pipeline model result, for example if RF is the first stage:

rfModel = model.bestModel.stages[0]

Then you can check the values of the params you tuned like this:

rfModel.getNumTrees

On Mon, Apr 16, 2018 at 7:52 AM, Aakash Basu 
wrote:

> Hi,
>
> I am running a Random Forest model on a dataset using hyper parameter
> tuning with Spark's paramGrid and Train Validation Split.
>
> Can anyone tell me how to get the best set for all the four parameters?
>
> I used:
>
> model.bestModel()
> model.metrics()
>
>
> But none of them seem to work.
>
>
> Below is the code chunk:
>
> paramGrid = ParamGridBuilder() \
> .addGrid(rf.numTrees, [50, 100, 150, 200]) \
> .addGrid(rf.maxDepth, [5, 10, 15, 20]) \
> .addGrid(rf.minInfoGain, [0.001, 0.01, 0.1, 0.6]) \
> .addGrid(rf.minInstancesPerNode, [5, 15, 30, 50, 100]) \
> .build()
>
> tvs = TrainValidationSplit(estimator=pipeline,
>estimatorParamMaps=paramGrid,
>evaluator=MulticlassClassificationEvaluator(),
># 80% of the data will be used for training, 20% 
> for validation.
>trainRatio=0.8)
>
> model = tvs.fit(trainingData)
>
> predictions = model.transform(testData)
>
> evaluator = MulticlassClassificationEvaluator(
> labelCol="label", predictionCol="prediction", metricName="accuracy")
> accuracy = evaluator.evaluate(predictions)
> print("Accuracy = %g" % accuracy)
> print("Test Error = %g" % (1.0 - accuracy))
>
>
> Any help?
>
>
> Thanks,
> Aakash.
>


Structured streaming: Tried to fetch $offset but the returned record offset was ${record.offset}"

2018-04-16 Thread ARAVIND SETHURATHNAM
Hi,
We have several structured streaming jobs (spark version 2.2.0) consuming from 
kafka and writing to s3. They were running fine for a month, since yesterday 
few jobs started failing and I see the below exception in the failed jobs  log,

```Tried to fetch 473151075 but the returned record offset was 473151072```
```GScheduler: ResultStage 0 (start at SparkStreamingTask.java:222) failed in 
77.546 s due to Job aborted due to stage failure: Task 86 in stage 0.0 failed 4 
times, most recent failure: Lost task 86.3 in stage 0.0 (TID 96, 
ip-10-120-12-52.ec2.internal, executor 11): java.lang.IllegalStateException: 
Tried to fetch 473151075 but the returned record offset was 473151072
at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer.fetchData(CachedKafkaConsumer.scala:234)
at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:106)
at 
org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:158)
at 
org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:149)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:30)
`

can someone provide some direction what could be causing this all of a sudden 
when consuming from those topics?

regards

Aravind



Curious case of Spark SQL 2.3 - number of stages different for the same query ever?

2018-04-16 Thread Jacek Laskowski
Hi,

I've got a case where the same structured query (it's union) gives 1 stage
for a run and 5 stages for another. I could not find any pattern yet (and
it's hard to reproduce it due to the volume and the application), but I'm
pretty certain that it's *never* possible that Spark 2.3 could come up with
1 vs 5 stages for the very same query plan (even if I changed number of
executors or number of cores or anything execution-related).

So my question is, is this possible that Spark SQL could give 1-stage
execution plan and 5-stage execution plan for the very same query?

(I am not saying that I'm 100% sure that the query is indeed the same since
I'm working on a reproducible test case and only when I got it I'll really
be).

Sorry for the vague description, but I've got nothing more to share yet.

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
Mastering Spark SQL https://bit.ly/mastering-spark-sql
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
Follow me at https://twitter.com/jaceklaskowski


Re: [Structured Streaming] More than 1 streaming in a code

2018-04-16 Thread Gerard Maas
Aakash,

There are two issues here.
The issue with the code on the first question is that the first query
blocks and the code for the second does not get executed. Panagiotis
pointed this out correctly.
In the updated code, the issue is related to netcat (nc) and the way
structured streaming works. As far as I remember, netcat only delivers data
to the first network connection.
On the structured streaming side, each query will issue its own connection.
This results in only the first query getting the data.
If you would talk to a TPC server supporting multiple connected clients,
you would see data in both queries.

If your actual source is Kafka, the original solution of using
`spark.streams.awaitAnyTermination`  should solve the problem.

-kr, Gerard.



On Mon, Apr 16, 2018 at 10:52 AM, Aakash Basu 
wrote:

> Hey Jayesh and Others,
>
> Is there then, any other way to come to a solution for this use-case?
>
> Thanks,
> Aakash.
>
> On Mon, Apr 16, 2018 at 8:11 AM, Lalwani, Jayesh <
> jayesh.lalw...@capitalone.com> wrote:
>
>> Note that what you are trying to do here is join a streaming data frame
>> with an aggregated streaming data frame. As per the documentation, joining
>> an aggregated streaming data frame with another streaming data frame is not
>> supported
>>
>>
>>
>>
>>
>> *From: *spark receiver 
>> *Date: *Friday, April 13, 2018 at 11:49 PM
>> *To: *Aakash Basu 
>> *Cc: *Panagiotis Garefalakis , user <
>> user@spark.apache.org>
>> *Subject: *Re: [Structured Streaming] More than 1 streaming in a code
>>
>>
>>
>> Hi Panagiotis ,
>>
>>
>>
>> Wondering you solved the problem or not? Coz I met the same issue today.
>> I’d appreciate  so much if you could paste the code snippet  if it’s
>> working .
>>
>>
>>
>> Thanks.
>>
>>
>>
>>
>>
>> 在 2018年4月6日,上午7:40,Aakash Basu  写道:
>>
>>
>>
>> Hi Panagiotis,
>>
>> I did that, but it still prints the result of the first query and awaits
>> for new data, doesn't even goes to the next one.
>>
>> *Data -*
>>
>> $ nc -lk 9998
>>
>> 1,2
>> 3,4
>> 5,6
>> 7,8
>>
>> *Result -*
>>
>> ---
>> Batch: 0
>> ---
>> ++
>> |aver|
>> ++
>> | 3.0|
>> ++
>>
>> ---
>> Batch: 1
>> ---
>> ++
>> |aver|
>> ++
>> | 4.0|
>> ++
>>
>>
>>
>> *Updated Code -*
>>
>> from pyspark.sql import SparkSession
>> from pyspark.sql.functions import split
>>
>> spark = SparkSession \
>> .builder \
>> .appName("StructuredNetworkWordCount") \
>> .getOrCreate()
>>
>> data = spark \
>> .readStream \
>> .format("socket") \
>> .option("header","true") \
>> .option("host", "localhost") \
>> .option("port", 9998) \
>> .load("csv")
>>
>>
>> id_DF = data.select(split(data.value, ",").getItem(0).alias("col1"), 
>> split(data.value, ",").getItem(1).alias("col2"))
>>
>> id_DF.createOrReplaceTempView("ds")
>>
>> df = spark.sql("select avg(col1) as aver from ds")
>>
>> df.createOrReplaceTempView("abcd")
>>
>> wordCounts = spark.sql("Select col1, col2, col2/(select aver from abcd) col3 
>> from ds")  # (select aver from abcd)
>>
>> query2 = df \
>> .writeStream \
>> .format("console") \
>> .outputMode("complete") \
>> .trigger(processingTime='5 seconds') \
>> .start()
>>
>> query = wordCounts \
>> .writeStream \
>> .format("console") \
>> .trigger(processingTime='5 seconds') \
>> .start()
>>
>> spark.streams.awaitAnyTermination()
>>
>>
>>
>> Thanks,
>>
>> Aakash.
>>
>>
>>
>> On Fri, Apr 6, 2018 at 4:18 PM, Panagiotis Garefalakis <
>> panga...@gmail.com> wrote:
>>
>> Hello Aakash,
>>
>>
>>
>> When you use query.awaitTermination you are pretty much blocking there
>> waiting for the current query to stop or throw an exception. In your case
>> the second query will not even start.
>>
>> What you could do instead is remove all the blocking calls and use
>> spark.streams.awaitAnyTermination instead (waiting for either query1 or
>> query2 to terminate). Make sure you do that after the query2.start call.
>>
>>
>>
>> I hope this helps.
>>
>>
>>
>> Cheers,
>>
>> Panagiotis
>>
>>
>>
>> On Fri, Apr 6, 2018 at 11:23 AM, Aakash Basu 
>> wrote:
>>
>> Any help?
>>
>> Need urgent help. Someone please clarify the doubt?
>>
>>
>>
>> -- Forwarded message --
>> From: *Aakash Basu* 
>> Date: Thu, Apr 5, 2018 at 3:18 PM
>> Subject: [Structured Streaming] More than 1 streaming in a code
>> To: user 
>>
>> Hi,
>>
>> If I have more than one writeStream in a code, which operates on the same
>> readStream data, why does it produce only the first writeStream? I want the
>> second one to be also printed on the console.
>>
>> How to do that?
>>
>>
>>
>> from pyspark.sql import 

Re: Spark-ML : Streaming library for Factorization Machine (FM/FFM)

2018-04-16 Thread Maximilien DEFOURNE

Hi,

Unfortunately no. i just used this lib for FM and FFM raw. I thought it 
could be a good baseline for your need.


Regards

Maximilien


On 16/04/18 15:43, Sundeep Kumar Mehta wrote:

Hi Maximilien,

Thanks for your response, Did you convert this repo into DStream for 
continuous/incremental training ?


Regards
Sundeep



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Structured Streaming] More than 1 streaming in a code

2018-04-16 Thread Lalwani, Jayesh
You could have a really large window.

From: Aakash Basu 
Date: Monday, April 16, 2018 at 10:56 AM
To: "Lalwani, Jayesh" 
Cc: spark receiver , Panagiotis Garefalakis 
, user 
Subject: Re: [Structured Streaming] More than 1 streaming in a code

If I use timestamp based windowing, then my average will not be global average 
but grouped by timestamp, which is not my requirement. I want to recalculate 
the avg of entire column, every time a new row(s) comes in and divide the other 
column with the updated avg.
Let me know, in-case you or anyone else has any soln. for this.

On Mon, Apr 16, 2018 at 7:52 PM, Lalwani, Jayesh 
> wrote:
You could do it if you had a timestamp in your data.  You can use windowed 
operations to divide a value by it’s own average over a window. However, in 
structured streaming, you can only window by timestamp columns. You cannot do 
windows aggregations on integers.

From: Aakash Basu 
>
Date: Monday, April 16, 2018 at 4:52 AM
To: "Lalwani, Jayesh" 
>
Cc: spark receiver >, 
Panagiotis Garefalakis >, user 
>

Subject: Re: [Structured Streaming] More than 1 streaming in a code

Hey Jayesh and Others,
Is there then, any other way to come to a solution for this use-case?

Thanks,
Aakash.

On Mon, Apr 16, 2018 at 8:11 AM, Lalwani, Jayesh 
> wrote:
Note that what you are trying to do here is join a streaming data frame with an 
aggregated streaming data frame. As per the documentation, joining an 
aggregated streaming data frame with another streaming data frame is not 
supported


From: spark receiver >
Date: Friday, April 13, 2018 at 11:49 PM
To: Aakash Basu >
Cc: Panagiotis Garefalakis >, 
user >
Subject: Re: [Structured Streaming] More than 1 streaming in a code

Hi Panagiotis ,

Wondering you solved the problem or not? Coz I met the same issue today. I’d 
appreciate  so much if you could paste the code snippet  if it’s working .

Thanks.


在 2018年4月6日,上午7:40,Aakash Basu 
> 写道:

Hi Panagiotis,
I did that, but it still prints the result of the first query and awaits for 
new data, doesn't even goes to the next one.
Data -

$ nc -lk 9998

1,2
3,4
5,6
7,8
Result -

---
Batch: 0
---
++
|aver|
++
| 3.0|
++

---
Batch: 1
---
++
|aver|
++
| 4.0|
++


Updated Code -

from pyspark.sql import SparkSession
from pyspark.sql.functions import split

spark = SparkSession \
.builder \
.appName("StructuredNetworkWordCount") \
.getOrCreate()

data = spark \
.readStream \
.format("socket") \
.option("header","true") \
.option("host", "localhost") \
.option("port", 9998) \
.load("csv")


id_DF = data.select(split(data.value, ",").getItem(0).alias("col1"), 
split(data.value, ",").getItem(1).alias("col2"))

id_DF.createOrReplaceTempView("ds")

df = spark.sql("select avg(col1) as aver from ds")

df.createOrReplaceTempView("abcd")

wordCounts = spark.sql("Select col1, col2, col2/(select aver from abcd) col3 
from ds")  # (select aver from abcd)

query2 = df \
.writeStream \
.format("console") \
.outputMode("complete") \
.trigger(processingTime='5 seconds') \
.start()

query = wordCounts \
.writeStream \
.format("console") \
.trigger(processingTime='5 seconds') \
.start()

spark.streams.awaitAnyTermination()

Thanks,
Aakash.

On Fri, Apr 6, 2018 at 4:18 PM, Panagiotis Garefalakis 
> wrote:
Hello Aakash,

When you use query.awaitTermination you are pretty much blocking there waiting 
for the current query to stop or throw an exception. In your case the second 
query will not even start.
What you could do instead is remove all the blocking calls and use 
spark.streams.awaitAnyTermination instead (waiting for either query1 or query2 
to terminate). Make sure you do that after the query2.start call.

I hope this helps.

Cheers,
Panagiotis

On Fri, Apr 6, 2018 at 11:23 AM, Aakash Basu 
> wrote:
Any help?
Need urgent help. Someone please 

Error: NoSuchFieldError: HIVE_STATS_JDBC_TIMEOUT while running a Spark-Hive Job

2018-04-16 Thread Rishikesh Gawade
Hello there,
I am using *spark-2.3.0* compiled using the following Maven Command:
*mvn -Pyarn -Phive -Phive-thriftserver -DskipTests clean install.*
I have configured it to run with *Hive v2.3.3*. Also, all the Hive related
jars (*v1.2.1*) in the Spark's JAR folder have been replaced by all the
JARs available in Hive's *lib* folder and i have also configured Hive to
use *spark* as an *execution engine*.

After that, in my Java application i have written the following lines:

SparkSession spark = SparkSession
.builder()
.appName("Java Spark Hive Example")
.config("hive.metastore.warehouse.dir","/user/hive/warehouse")
.config("hive.metastore.uris","thrift://hadoopmaster:9083")
.enableHiveSupport()
.getOrCreate();
HiveContext hc = new HiveContext(spark);
hc.sql("SHOW DATABASES").show();

I built the project thereafter (no compilation errors) and tried running
the job using spark-submit command as follows:

spark-submit --master yarn --class org.adbms.SpamFilter
IdeaProjects/mlproject/target/mlproject-1.0-SNAPSHOT.jar

On doing so, i received the following error:
Exception in thread "main" java.lang.NoSuchFieldError:
HIVE_STATS_JDBC_TIMEOUT
at
org.apache.spark.sql.hive.HiveUtils$.formatTimeVarsForHiveClient(HiveUtils.scala:205)
at
org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:286)
at
org.apache.spark.sql.hive.HiveExternalCatalog.client$lzycompute(HiveExternalCatalog.scala:66)
at
org.apache.spark.sql.hive.HiveExternalCatalog.client(HiveExternalCatalog.scala:65)
at
org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$databaseExists$1.apply$mcZ$sp(HiveExternalCatalog.scala:195)
at
org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$databaseExists$1.apply(HiveExternalCatalog.scala:195)
at
org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$databaseExists$1.apply(HiveExternalCatalog.scala:195)
at
org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:97)
at
org.apache.spark.sql.hive.HiveExternalCatalog.databaseExists(HiveExternalCatalog.scala:194)
and more...


I have no idea what this is about. Is this because Spark v2.3.0 and Hive
v2.3.3 aren't compatible with each other?
If i have done anything wrong, i request you to point it out and suggest me
the required changes. Also, if it's the case that i might have
misconfigured spark and hive, please suggest me the changes in
configuration, a link guiding through all necessary configs would also be
appreciated.
Thank you in anticipation.
Regards,
Rishikesh Gawade


Re: [Structured Streaming] More than 1 streaming in a code

2018-04-16 Thread Aakash Basu
If I use timestamp based windowing, then my average will not be global
average but grouped by timestamp, which is not my requirement. I want to
recalculate the avg of entire column, every time a new row(s) comes in and
divide the other column with the updated avg.

Let me know, in-case you or anyone else has any soln. for this.

On Mon, Apr 16, 2018 at 7:52 PM, Lalwani, Jayesh <
jayesh.lalw...@capitalone.com> wrote:

> You could do it if you had a timestamp in your data.  You can use windowed
> operations to divide a value by it’s own average over a window. However, in
> structured streaming, you can only window by timestamp columns. You cannot
> do windows aggregations on integers.
>
>
>
> *From: *Aakash Basu 
> *Date: *Monday, April 16, 2018 at 4:52 AM
> *To: *"Lalwani, Jayesh" 
> *Cc: *spark receiver , Panagiotis Garefalakis <
> panga...@gmail.com>, user 
>
> *Subject: *Re: [Structured Streaming] More than 1 streaming in a code
>
>
>
> Hey Jayesh and Others,
>
> Is there then, any other way to come to a solution for this use-case?
>
>
>
> Thanks,
>
> Aakash.
>
>
>
> On Mon, Apr 16, 2018 at 8:11 AM, Lalwani, Jayesh <
> jayesh.lalw...@capitalone.com> wrote:
>
> Note that what you are trying to do here is join a streaming data frame
> with an aggregated streaming data frame. As per the documentation, joining
> an aggregated streaming data frame with another streaming data frame is not
> supported
>
>
>
>
>
> *From: *spark receiver 
> *Date: *Friday, April 13, 2018 at 11:49 PM
> *To: *Aakash Basu 
> *Cc: *Panagiotis Garefalakis , user <
> user@spark.apache.org>
> *Subject: *Re: [Structured Streaming] More than 1 streaming in a code
>
>
>
> Hi Panagiotis ,
>
>
>
> Wondering you solved the problem or not? Coz I met the same issue today.
> I’d appreciate  so much if you could paste the code snippet  if it’s
> working .
>
>
>
> Thanks.
>
>
>
>
>
> 在 2018年4月6日,上午7:40,Aakash Basu  写道:
>
>
>
> Hi Panagiotis,
>
> I did that, but it still prints the result of the first query and awaits
> for new data, doesn't even goes to the next one.
>
> *Data -*
>
> $ nc -lk 9998
>
> 1,2
> 3,4
> 5,6
> 7,8
>
> *Result -*
>
> ---
> Batch: 0
> ---
> ++
> |aver|
> ++
> | 3.0|
> ++
>
> ---
> Batch: 1
> ---
> ++
> |aver|
> ++
> | 4.0|
> ++
>
>
>
> *Updated Code -*
>
> from pyspark.sql import SparkSession
> from pyspark.sql.functions import split
>
> spark = SparkSession \
> .builder \
> .appName("StructuredNetworkWordCount") \
> .getOrCreate()
>
> data = spark \
> .readStream \
> .format("socket") \
> .option("header","true") \
> .option("host", "localhost") \
> .option("port", 9998) \
> .load("csv")
>
>
> id_DF = data.select(split(data.value, ",").getItem(0).alias("col1"), 
> split(data.value, ",").getItem(1).alias("col2"))
>
> id_DF.createOrReplaceTempView("ds")
>
> df = spark.sql("select avg(col1) as aver from ds")
>
> df.createOrReplaceTempView("abcd")
>
> wordCounts = spark.sql("Select col1, col2, col2/(select aver from abcd) col3 
> from ds")  # (select aver from abcd)
>
> query2 = df \
> .writeStream \
> .format("console") \
> .outputMode("complete") \
> .trigger(processingTime='5 seconds') \
> .start()
>
> query = wordCounts \
> .writeStream \
> .format("console") \
> .trigger(processingTime='5 seconds') \
> .start()
>
> spark.streams.awaitAnyTermination()
>
>
>
> Thanks,
>
> Aakash.
>
>
>
> On Fri, Apr 6, 2018 at 4:18 PM, Panagiotis Garefalakis 
> wrote:
>
> Hello Aakash,
>
>
>
> When you use query.awaitTermination you are pretty much blocking there
> waiting for the current query to stop or throw an exception. In your case
> the second query will not even start.
>
> What you could do instead is remove all the blocking calls and use
> spark.streams.awaitAnyTermination instead (waiting for either query1 or
> query2 to terminate). Make sure you do that after the query2.start call.
>
>
>
> I hope this helps.
>
>
>
> Cheers,
>
> Panagiotis
>
>
>
> On Fri, Apr 6, 2018 at 11:23 AM, Aakash Basu 
> wrote:
>
> Any help?
>
> Need urgent help. Someone please clarify the doubt?
>
>
>
> -- Forwarded message --
> From: *Aakash Basu* 
> Date: Thu, Apr 5, 2018 at 3:18 PM
> Subject: [Structured Streaming] More than 1 streaming in a code
> To: user 
>
> Hi,
>
> If I have more than one writeStream in a code, which operates on the same
> readStream data, why does it produce only the first writeStream? I want the
> second one to be also printed on the console.
>
> How to do 

Re: ERROR: Hive on Spark

2018-04-16 Thread naresh Goud
Change you table name in query to spam.spamdataset instead of spamdataset.

On Sun, Apr 15, 2018 at 2:12 PM Rishikesh Gawade 
wrote:

> Hello there. I am a newbie in the world of Spark. I have been working on a
> Spark Project using Java.
> I have configured Hive and Spark to run on Hadoop.
> As of now i have created a Hive (derby) database on Hadoop HDFS at the
> given location(warehouse location): */user/hive/warehouse *and database
> name as : *spam *(saved as *spam.db* at the aforementioned location).
> I have been trying to read tables in this database in spark to create
> RDDs/DataFrames.
> Could anybody please guide me in how I can achieve this?
> I used the following statements in my Java Code:
>
> SparkSession spark = SparkSession
> .builder()
> .appName("Java Spark Hive Example").master("yarn")
> .config("spark.sql.warehouse.dir","/user/hive/warehouse")
> .enableHiveSupport()
> .getOrCreate();
> spark.sql("USE spam");
> spark.sql("SELECT * FROM spamdataset").show();
>
> After this i built the project using Maven as follows: mvn clean package
> -DskipTests and a JAR was generated.
>
> After this, I tried running the project via spark-submit CLI using :
>
> spark-submit --class com.adbms.SpamFilter --master yarn
> ~/IdeaProjects/mlproject/target/mlproject-1.0-SNAPSHOT.jar
>
> and got the following error:
>
> Exception in thread "main"
> org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException: Database
> 'spam' not found;
> at org.apache.spark.sql.catalyst.catalog.SessionCatalog.org
> $apache$spark$sql$catalyst$catalog$SessionCatalog$$requireDbExists(SessionCatalog.scala:174)
> at
> org.apache.spark.sql.catalyst.catalog.SessionCatalog.setCurrentDatabase(SessionCatalog.scala:256)
> at
> org.apache.spark.sql.execution.command.SetDatabaseCommand.run(databases.scala:59)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
> at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
> at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
> at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3253)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
> at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3252)
> at org.apache.spark.sql.Dataset.(Dataset.scala:190)
> at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:75)
> at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:638)
> at com.adbms.SpamFilter.main(SpamFilter.java:54)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
> at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:879)
> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:197)
> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:227)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:136)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
> Also, I replaced the SQL query with "SHOW DATABASES", and it showed only
> one database namely "default". Those stored on HDFS warehouse dir weren't
> shown.
>
> I request you to please check this and if anything is wrong then please
> suggest an ideal way to read Hive tables on Hadoop in Spark using Java. A
> link to a webpage having relevant info would also be appreciated.
> Thank you in anticipation.
> Regards,
> Rishikesh Gawade
>
> --
Thanks,
Naresh
www.linkedin.com/in/naresh-dulam
http://hadoopandspark.blogspot.com/


Re: [Structured Streaming] More than 1 streaming in a code

2018-04-16 Thread Lalwani, Jayesh
You could do it if you had a timestamp in your data.  You can use windowed 
operations to divide a value by it’s own average over a window. However, in 
structured streaming, you can only window by timestamp columns. You cannot do 
windows aggregations on integers.

From: Aakash Basu 
Date: Monday, April 16, 2018 at 4:52 AM
To: "Lalwani, Jayesh" 
Cc: spark receiver , Panagiotis Garefalakis 
, user 
Subject: Re: [Structured Streaming] More than 1 streaming in a code

Hey Jayesh and Others,
Is there then, any other way to come to a solution for this use-case?

Thanks,
Aakash.

On Mon, Apr 16, 2018 at 8:11 AM, Lalwani, Jayesh 
> wrote:
Note that what you are trying to do here is join a streaming data frame with an 
aggregated streaming data frame. As per the documentation, joining an 
aggregated streaming data frame with another streaming data frame is not 
supported


From: spark receiver >
Date: Friday, April 13, 2018 at 11:49 PM
To: Aakash Basu >
Cc: Panagiotis Garefalakis >, 
user >
Subject: Re: [Structured Streaming] More than 1 streaming in a code

Hi Panagiotis ,

Wondering you solved the problem or not? Coz I met the same issue today. I’d 
appreciate  so much if you could paste the code snippet  if it’s working .

Thanks.


在 2018年4月6日,上午7:40,Aakash Basu 
> 写道:

Hi Panagiotis,
I did that, but it still prints the result of the first query and awaits for 
new data, doesn't even goes to the next one.
Data -

$ nc -lk 9998

1,2
3,4
5,6
7,8
Result -

---
Batch: 0
---
++
|aver|
++
| 3.0|
++

---
Batch: 1
---
++
|aver|
++
| 4.0|
++


Updated Code -

from pyspark.sql import SparkSession
from pyspark.sql.functions import split

spark = SparkSession \
.builder \
.appName("StructuredNetworkWordCount") \
.getOrCreate()

data = spark \
.readStream \
.format("socket") \
.option("header","true") \
.option("host", "localhost") \
.option("port", 9998) \
.load("csv")


id_DF = data.select(split(data.value, ",").getItem(0).alias("col1"), 
split(data.value, ",").getItem(1).alias("col2"))

id_DF.createOrReplaceTempView("ds")

df = spark.sql("select avg(col1) as aver from ds")

df.createOrReplaceTempView("abcd")

wordCounts = spark.sql("Select col1, col2, col2/(select aver from abcd) col3 
from ds")  # (select aver from abcd)

query2 = df \
.writeStream \
.format("console") \
.outputMode("complete") \
.trigger(processingTime='5 seconds') \
.start()

query = wordCounts \
.writeStream \
.format("console") \
.trigger(processingTime='5 seconds') \
.start()

spark.streams.awaitAnyTermination()

Thanks,
Aakash.

On Fri, Apr 6, 2018 at 4:18 PM, Panagiotis Garefalakis 
> wrote:
Hello Aakash,

When you use query.awaitTermination you are pretty much blocking there waiting 
for the current query to stop or throw an exception. In your case the second 
query will not even start.
What you could do instead is remove all the blocking calls and use 
spark.streams.awaitAnyTermination instead (waiting for either query1 or query2 
to terminate). Make sure you do that after the query2.start call.

I hope this helps.

Cheers,
Panagiotis

On Fri, Apr 6, 2018 at 11:23 AM, Aakash Basu 
> wrote:
Any help?
Need urgent help. Someone please clarify the doubt?

-- Forwarded message --
From: Aakash Basu 
>
Date: Thu, Apr 5, 2018 at 3:18 PM
Subject: [Structured Streaming] More than 1 streaming in a code
To: user >
Hi,
If I have more than one writeStream in a code, which operates on the same 
readStream data, why does it produce only the first writeStream? I want the 
second one to be also printed on the console.
How to do that?


from pyspark.sql import SparkSession
from pyspark.sql.functions import split, col

class test:


spark = SparkSession.builder \
.appName("Stream_Col_Oper_Spark") \
.getOrCreate()

data = spark.readStream.format("kafka") \
.option("startingOffsets", "latest") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "test1") \
.load()

ID = data.select('value') \
.withColumn('value', 

Re: Spark-ML : Streaming library for Factorization Machine (FM/FFM)

2018-04-16 Thread Sundeep Kumar Mehta
Hi Maximilien,

Thanks for your response, Did you convert this repo into DStream for
continuous/incremental training ?

Regards
Sundeep

On Mon, Apr 16, 2018 at 4:17 PM, Maximilien DEFOURNE <
maximilien.defou...@s4m.io> wrote:

> Hi,
>
> I used this repo for FM/FFM : https://github.com/Intel-
> bigdata/imllib-spark
>
>
> Regards
>
> Maximilien DEFOURNE
>
> On 15/04/18 05:14, Sundeep Kumar Mehta wrote:
>
> Hi All,
>
> Any library/ github project to use factorization machine or field aware
> factorization machine via online learning for continuous training ?
>
> Request you to please share your thoughts on this.
>
> Regards
> Sundeep
>
>
>
>


Re: Spark-ML : Streaming library for Factorization Machine (FM/FFM)

2018-04-16 Thread Maximilien DEFOURNE

Hi,

I used this repo for FM/FFM : https://github.com/Intel-bigdata/imllib-spark


Regards

Maximilien DEFOURNE


On 15/04/18 05:14, Sundeep Kumar Mehta wrote:

Hi All,

Any library/ github project to use factorization machine or field 
aware factorization machine via online learning for continuous training ?


Request you to please share your thoughts on this.

Regards
Sundeep






Re: [Structured Streaming] More than 1 streaming in a code

2018-04-16 Thread Aakash Basu
Hey Jayesh and Others,

Is there then, any other way to come to a solution for this use-case?

Thanks,
Aakash.

On Mon, Apr 16, 2018 at 8:11 AM, Lalwani, Jayesh <
jayesh.lalw...@capitalone.com> wrote:

> Note that what you are trying to do here is join a streaming data frame
> with an aggregated streaming data frame. As per the documentation, joining
> an aggregated streaming data frame with another streaming data frame is not
> supported
>
>
>
>
>
> *From: *spark receiver 
> *Date: *Friday, April 13, 2018 at 11:49 PM
> *To: *Aakash Basu 
> *Cc: *Panagiotis Garefalakis , user <
> user@spark.apache.org>
> *Subject: *Re: [Structured Streaming] More than 1 streaming in a code
>
>
>
> Hi Panagiotis ,
>
>
>
> Wondering you solved the problem or not? Coz I met the same issue today.
> I’d appreciate  so much if you could paste the code snippet  if it’s
> working .
>
>
>
> Thanks.
>
>
>
>
>
> 在 2018年4月6日,上午7:40,Aakash Basu  写道:
>
>
>
> Hi Panagiotis,
>
> I did that, but it still prints the result of the first query and awaits
> for new data, doesn't even goes to the next one.
>
> *Data -*
>
> $ nc -lk 9998
>
> 1,2
> 3,4
> 5,6
> 7,8
>
> *Result -*
>
> ---
> Batch: 0
> ---
> ++
> |aver|
> ++
> | 3.0|
> ++
>
> ---
> Batch: 1
> ---
> ++
> |aver|
> ++
> | 4.0|
> ++
>
>
>
> *Updated Code -*
>
> from pyspark.sql import SparkSession
> from pyspark.sql.functions import split
>
> spark = SparkSession \
> .builder \
> .appName("StructuredNetworkWordCount") \
> .getOrCreate()
>
> data = spark \
> .readStream \
> .format("socket") \
> .option("header","true") \
> .option("host", "localhost") \
> .option("port", 9998) \
> .load("csv")
>
>
> id_DF = data.select(split(data.value, ",").getItem(0).alias("col1"), 
> split(data.value, ",").getItem(1).alias("col2"))
>
> id_DF.createOrReplaceTempView("ds")
>
> df = spark.sql("select avg(col1) as aver from ds")
>
> df.createOrReplaceTempView("abcd")
>
> wordCounts = spark.sql("Select col1, col2, col2/(select aver from abcd) col3 
> from ds")  # (select aver from abcd)
>
> query2 = df \
> .writeStream \
> .format("console") \
> .outputMode("complete") \
> .trigger(processingTime='5 seconds') \
> .start()
>
> query = wordCounts \
> .writeStream \
> .format("console") \
> .trigger(processingTime='5 seconds') \
> .start()
>
> spark.streams.awaitAnyTermination()
>
>
>
> Thanks,
>
> Aakash.
>
>
>
> On Fri, Apr 6, 2018 at 4:18 PM, Panagiotis Garefalakis 
> wrote:
>
> Hello Aakash,
>
>
>
> When you use query.awaitTermination you are pretty much blocking there
> waiting for the current query to stop or throw an exception. In your case
> the second query will not even start.
>
> What you could do instead is remove all the blocking calls and use
> spark.streams.awaitAnyTermination instead (waiting for either query1 or
> query2 to terminate). Make sure you do that after the query2.start call.
>
>
>
> I hope this helps.
>
>
>
> Cheers,
>
> Panagiotis
>
>
>
> On Fri, Apr 6, 2018 at 11:23 AM, Aakash Basu 
> wrote:
>
> Any help?
>
> Need urgent help. Someone please clarify the doubt?
>
>
>
> -- Forwarded message --
> From: *Aakash Basu* 
> Date: Thu, Apr 5, 2018 at 3:18 PM
> Subject: [Structured Streaming] More than 1 streaming in a code
> To: user 
>
> Hi,
>
> If I have more than one writeStream in a code, which operates on the same
> readStream data, why does it produce only the first writeStream? I want the
> second one to be also printed on the console.
>
> How to do that?
>
>
>
> from pyspark.sql import SparkSession
> from pyspark.sql.functions import split, col
>
> class test:
>
>
> spark = SparkSession.builder \
> .appName("Stream_Col_Oper_Spark") \
> .getOrCreate()
>
> data = spark.readStream.format("kafka") \
> .option("startingOffsets", "latest") \
> .option("kafka.bootstrap.servers", "localhost:9092") \
> .option("subscribe", "test1") \
> .load()
>
> ID = data.select('value') \
> .withColumn('value', data.value.cast("string")) \
> .withColumn("Col1", split(col("value"), ",").getItem(0)) \
> .withColumn("Col2", split(col("value"), ",").getItem(1)) \
> .drop('value')
>
> ID.createOrReplaceTempView("transformed_Stream_DF")
>
> df = spark.sql("select avg(col1) as aver from transformed_Stream_DF")
>
> df.createOrReplaceTempView("abcd")
>
> wordCounts = spark.sql("Select col1, col2, col2/(select aver from abcd) 
> col3 from transformed_Stream_DF")
>
>
> # ---#
>
> query1 = df \
> 

Re: Structured Streaming on Kubernetes

2018-04-16 Thread Krishna Kalyan
Thank you so much TD, Matt, Anirudh and Oz,
Really appropriate this.

On Fri, Apr 13, 2018 at 9:54 PM, Oz Ben-Ami  wrote:

> I can confirm that Structured Streaming works on Kubernetes, though we're
> not quite on production with that yet. Issues we're looking at are:
> - Submission through spark-submit works, but is a bit clunky with a
> kubernetes-centered workflow. Spark Operator
>  is
> promising, but still in alpha (eg, we ran into this
> ). Even better
> would be something that runs the driver as a Deployment / StatefulSet, so
> that long-running streaming jobs can be restarted automatically
> - Dynamic allocation: works with the spark-on-k8s fork, but not with plain
> Spark 2.3, due to reliance on shuffle service which hasn't been merged yet.
> Ideal implementation would be able to connect to a PersistentVolume
> independently of a node, but that's a bit more complicated
> - Checkpointing: We checkpoint to a separate HDFS (Dataproc) cluster,
> which works well for us both on the old Spark Streaming and Structured
> Streaming. We've successfully experimented with HDFS on Kubernetes
> , but
> again not in production
> - UI: Unfortunately Structured Streaming does not yet have a comprehensive
> UI like the old Spark Streaming, but it does show the basic information
> (jobs, stages, queries, executors), and other information is generally
> available in the logs and metrics
> - Monitoring / Logging: this is a strength of Kubernetes, in that it's all
> centralized by the cluster. We use Splunk, but it would also be possible to 
> hook
> up  Spark's Dropwizard
> Metrics library to Prometheus, and read logs with fluentd or Stackdriver.
> - Side note: Kafka support in Spark and Structured Streaming is very good,
> but as of Spark 2.3 there are still a couple of missing features, notably
> transparent avro support (UDFs are needed) and taking advantage of
> transactional processing (introduced to Kafka last year) for better
> exactly-once guarantees
>
> On Fri, Apr 13, 2018 at 3:08 PM, Anirudh Ramanathan <
> ramanath...@google.com> wrote:
>
>> +ozzieba who was experimenting with streaming workloads recently. +1 to
>> what Matt said. Checkpointing and driver recovery is future work.
>> Structured streaming is important, and it would be good to get some
>> production experiences here and try and target improving the feature's
>> support on K8s for 2.4/3.0.
>>
>>
>> On Fri, Apr 13, 2018 at 11:55 AM Matt Cheah  wrote:
>>
>>> We don’t provide any Kubernetes-specific mechanisms for streaming, such
>>> as checkpointing to persistent volumes. But as long as streaming doesn’t
>>> require persisting to the executor’s local disk, streaming ought to work
>>> out of the box. E.g. you can checkpoint to HDFS, but not to the pod’s local
>>> directories.
>>>
>>>
>>>
>>> However, I’m unaware of any specific use of streaming with the Spark on
>>> Kubernetes integration right now. Would be curious to get feedback on the
>>> failover behavior right now.
>>>
>>>
>>>
>>> -Matt Cheah
>>>
>>>
>>>
>>> *From: *Tathagata Das 
>>> *Date: *Friday, April 13, 2018 at 1:27 AM
>>> *To: *Krishna Kalyan 
>>> *Cc: *user 
>>> *Subject: *Re: Structured Streaming on Kubernetes
>>>
>>>
>>>
>>> Structured streaming is stable in production! At Databricks, we and our
>>> customers collectively process almost 100s of billions of records per day
>>> using SS. However, we are not using kubernetes :)
>>>
>>>
>>>
>>> Though I don't think it will matter too much as long as kubes are
>>> correctly provisioned+configured and you are checkpointing to HDFS (for
>>> fault-tolerance guarantees).
>>>
>>>
>>>
>>> TD
>>>
>>>
>>>
>>> On Fri, Apr 13, 2018, 12:28 AM Krishna Kalyan 
>>> wrote:
>>>
>>> Hello All,
>>>
>>> We were evaluating Spark Structured Streaming on Kubernetes (Running on
>>> GCP). It would be awesome if the spark community could share their
>>> experience around this. I would like to know more about you production
>>> experience and the monitoring tools you are using.
>>>
>>>
>>>
>>> Since spark on kubernetes is a relatively new addition to spark, I was
>>> wondering if structured streaming is stable in production. We were also
>>> evaluating Apache Beam with Flink.
>>>
>>>
>>>
>>> Regards,
>>>
>>> Krishna
>>>
>>>
>>>
>>>
>>>
>>>
>>
>> --
>> Anirudh Ramanathan
>>
>
>