Re: How to branch a Stream / have multiple Sinks / do multiple Queries on one Stream

2018-07-05 Thread Tathagata Das
Hey all,

In Spark 2.4.0, there will be a new feature called *foreachBatch* which
will expose the output rows of every micro-batch as a dataframe, on which
you apply a user-defined function. With that, you can reuse existing batch
sources for writing results as well as write results to multiple locations.

*Reuse existing batch data sources*
For many storage systems, there may not be a streaming sink available yet,
but there may already exist a data writer for batch queries. Using
foreachBatch(), you can use the batch data writers on the output of each
micro-batch. For example writing from a stream to cassandra using the
Cassandra Spark Connector will be like

streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long)
=>
  batchDF.write.cassandraFormat(...).save(...)
}

*Write to multiple locations*
If you want to write the output of a streaming query to multiple locations,
then you can simply write the output DataFrame/Dataset multiple times.
However, each attempt to write can cause the output data to be recomputed
(including possible re-reading of the input data). To avoid recomputations,
you should cache the output DataFrame/Dataset, write it to multiple
locations, and then uncache it. Here is an outline.

streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long)
=>
  batchDF.cache()
  batchDF.write.format(...).save(...)  // location 1
  batchDF.write.format(...).save(...)  // location 2
  batchDF.uncache()
}

*Apply additional DataFrame operations*
Many DataFrame and Dataset operations are not supported in streaming
DataFrames because Spark does not support generating incremental plans in
those cases. Using foreachBatch() you can apply some of these operations on
each micro-batch output. However, you will have to reason about the
end-to-end semantics of doing that operation yourself.

*NOTE: *By default foreachBatch() provides only at-least-once write
guarantees. However, you can use the batchId provided to the function as a
way to deduplicate the output and get an exactly-once guarantee.

TD

On Thu, Jul 5, 2018 at 12:33 AM, Amiya Mishra <
amiya.mis...@bitwiseglobal.com> wrote:

> Hi Chandan/Jürgen,
>
> I had tried through a native code having single input data frame with
> multiple sinks as :
>
> Spark provides a method called awaitAnyTermination() in
> StreamingQueryManager.scala which provides all the required details to
> handle the query processed by spark.By observing documentation of spark
> with
> below points :
> -> Wait until any of the queries on the associated
> SQLContext has
> terminated since the creation of the context, or since `resetTerminated()`
> was called. If any query was terminated
> -> If a query has terminated, then subsequent calls to
> `awaitAnyTermination()` will either return immediately (if the query was
> terminated  by `query.stop()`),or throw the exception immediately (if the
> query was terminated with exception). Use `resetTerminated()` to clear past
> terminations and wait for new terminations.
> -> In the case where multiple queries have terminated since
> `resetTermination()` was called, if any query has terminated with
> exception,
> when `awaitAnyTermination()` will throw any of the exception. For correctly
> documenting exceptions across multiple queries,users need to  stop all of
> them after any of them terminates with exception, and then check the
> `query.exception()` for each query.
>
>
> val inputdf:DataFrame =
> sparkSession.readStream.schema(schema).format("csv").
> option("delimiter",",").csv("src/main/streamingInput")
> query1 =
> inputdf.writeStream.option("path","first_output").option("
> checkpointLocation","checkpointloc").format("csv").start()
> query2 =
> inputdf.writeStream.option("path","second_output").option(
> "checkpointLocation","checkpoint2").format("csv").start()
> sparkSession.streams.awaitAnyTermination()
>
>
> Now, both "first_output" and "second_output" file write successfully.
>
> Try it out on your site and let me know if you found any limitation.And try
> to posting if you found any other way.
>
> Let me correct if i had grammatical mistake.
>
> Thanks
> Amiya
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


unsubscribe

2018-07-05 Thread Peter
unsubscribe

Fwd: BeakerX 1.0 released

2018-07-05 Thread s...@draves.org
We are pleased to announce the release of BeakerX 1.0 .

BeakerX is a collection of kernels and extensions to the Jupyter
interactive computing environment. It provides JVM support, Spark cluster
support, polyglot programming, interactive plots, tables, forms,
publishing, and more.


   - Groovy, Scala, Clojure, Kotlin, Java, and SQL, including many magics;
   - Widgets for time-series plotting, tables, forms, and more (there are
   Python and JavaScript APIs in addition to the JVM languages);
   - Polyglot magics and autotranslation, allowing you access multiple
   languages in the same noteobook, and seamlessly communicate between them;
   - Apache Spark integration including GUI configuration, status,
   progress, interrupt, and tables;
   - One-click publication with interactive plots and tables, and
   - Jupyter Lab.

BeakerX is available via conda, pip, and docker. Or try it live online with
Binder

.

Thank you, -Scott


Re: Run Python User Defined Functions / code in Spark with Scala Codebase

2018-07-05 Thread Jayant Shekhar
Hello Chetan,

We have currently done it with .pipe(.py) as Prem suggested.

That passes the RDD as CSV strings to the python script. The python script
can either process it line by line, create the result and return it back.
Or create things like Pandas Dataframe for processing and finally write the
results back.

In the Spark/Scala/Java code, you get an RDD of string, which we convert
back to a Dataframe.

Feel free to ping me directly in case of questions.

Thanks,
Jayant


On Thu, Jul 5, 2018 at 3:39 AM, Chetan Khatri 
wrote:

> Prem sure, Thanks for suggestion.
>
> On Wed, Jul 4, 2018 at 8:38 PM, Prem Sure  wrote:
>
>> try .pipe(.py) on RDD
>>
>> Thanks,
>> Prem
>>
>> On Wed, Jul 4, 2018 at 7:59 PM, Chetan Khatri <
>> chetan.opensou...@gmail.com> wrote:
>>
>>> Can someone please suggest me , thanks
>>>
>>> On Tue 3 Jul, 2018, 5:28 PM Chetan Khatri, 
>>> wrote:
>>>
 Hello Dear Spark User / Dev,

 I would like to pass Python user defined function to Spark Job
 developed using Scala and return value of that function would be returned
 to DF / Dataset API.

 Can someone please guide me, which would be best approach to do this.
 Python function would be mostly transformation function. Also would like to
 pass Java Function as a String to Spark / Scala job and it applies to RDD /
 Data Frame and should return RDD / Data Frame.

 Thank you.




>>
>


Strange behavior of Spark Masters during rolling update

2018-07-05 Thread bsikander
We have a Spark standalone cluster running on 2.2.1 in HA mode using
Zookeeper. Occasionally, we have a rolling update where first the Primary
master goes down and then Secondary node and then zookeeper nodes running on
there own VMs. In the image below, 


 

Legends:
- Green line shows Secondary Master
- Yellow line shows Primary Master
- "1.0" on vertical axis shows STANDBY
- "5.0" on vertical axis shows UP

In the image you can see the strange behavior of Spark Master's.
- At 16:25 why did the masters switched roles between STANDBY and ALIVE?

Any help would be appreciated.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark 2.3 Kubernetes error

2018-07-05 Thread purna pradeep
Hello,



When I’m trying to set below options to spark-submit command on k8s Master
getting below error in spark-driver pod logs



--conf spark.executor.extraJavaOptions=" -Dhttps.proxyHost=myhost
-Dhttps.proxyPort=8099 -Dhttp.useproxy=true -Dhttps.protocols=TLSv1.2" \

--conf spark.driver.extraJavaOptions="--Dhttps.proxyHost=myhost
-Dhttps.proxyPort=8099 -Dhttp.useproxy=true -Dhttps.protocols=TLSv1.2" \



But when I tried to set these extraJavaoptions as system.properties in the
spark application jar everything works fine.



2018-06-11 21:26:28 ERROR SparkContext:91 - Error initializing SparkContext.

org.apache.spark.SparkException: External scheduler cannot be instantiated

at
org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2747)

at
org.apache.spark.SparkContext.init(SparkContext.scala:492)

at
org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2486)

at
org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:930)

at
org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:921)

at scala.Option.getOrElse(Option.scala:121)

at
org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:921)

Caused by: io.fabric8.kubernetes.client.KubernetesClientException:
Operation: [get]  for kind: [Pod]  with name:
[test-657e2f715ada3f91ae32c588aa178f63-driver]  in namespace: [test]
failed.

at
io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:62)

at
io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:71)

at
io.fabric8.kubernetes.client.dsl.base.BaseOperation.getMandatory(BaseOperation.java:228)

at
io.fabric8.kubernetes.client.dsl.base.BaseOperation.get(BaseOperation.java:184)

at
org.apache.spark.scheduler.cluster.k8s.KubernetesClusterSchedulerBackend.init(KubernetesClusterSchedulerBackend.scala:70)

at
org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager.createSchedulerBackend(KubernetesClusterManager.scala:120)

at
org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2741)

... 12 more

Caused by: javax.net.ssl.SSLHandshakeException:
sun.security.validator.ValidatorException: PKIX path building failed:
sun.security.provider.certpath.SunCertPathBuilderException: unable to find
valid certification path to requested target

at sun.security.ssl.Alerts.getSSLException(Alerts.java:192)

at
sun.security.ssl.SSLSocketImpl.fatal(SSLSocketImpl.java:1959)

at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:302)

at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:296)

at
sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1514)

at
sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:216)

at
sun.security.ssl.Handshaker.processLoop(Handshaker.java:1026)

at
sun.security.ssl.Handshaker.process_record(Handshaker.java:961)

at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:1072)

at
sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1385)

at
sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1413)

at
sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1397)

at
okhttp3.internal.connection.RealConnection.connectTls(RealConnection.java:281)

at
okhttp3.internal.connection.RealConnection.establishProtocol(RealConnection.java:251)

at
okhttp3.internal.connection.RealConnection.connect(RealConnection.java:151)

at
okhttp3.internal.connection.StreamAllocation.findConnection(StreamAllocation.java:195)

at
okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:121)

at
okhttp3.internal.connection.StreamAllocation.newStream(StreamAllocation.java:100)

at
okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:42)

at
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)

at
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)

at
okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)

at
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)

at
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)

at

Spark 2.3 Kubernetes error

2018-07-05 Thread Mamillapalli, Purna Pradeep
Hello,

When I’m trying to set below options to spark-submit command on k8s Master 
getting below error in spark-driver pod logs



--conf spark.executor.extraJavaOptions=" -Dhttps.proxyHost=myhost 
-Dhttps.proxyPort=8099 -Dhttp.useproxy=true -Dhttps.protocols=TLSv1.2" \

--conf spark.driver.extraJavaOptions="--Dhttps.proxyHost=myhost 
-Dhttps.proxyPort=8099 -Dhttp.useproxy=true -Dhttps.protocols=TLSv1.2" \


But when I tried to set these extraJavaoptions as system.properties in the 
spark application jar everything works fine.


2018-06-11 21:26:28 ERROR SparkContext:91 - Error initializing SparkContext.

org.apache.spark.SparkException: External scheduler cannot be instantiated

at 
org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2747)

at 
org.apache.spark.SparkContext.init(SparkContext.scala:492)

at 
org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2486)

at 
org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:930)

at 
org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:921)

at scala.Option.getOrElse(Option.scala:121)

at 
org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:921)

Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Operation: 
[get]  for kind: [Pod]  with name: 
[test-657e2f715ada3f91ae32c588aa178f63-driver]  in namespace: [test]  failed.

at 
io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:62)

at 
io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:71)

at 
io.fabric8.kubernetes.client.dsl.base.BaseOperation.getMandatory(BaseOperation.java:228)

at 
io.fabric8.kubernetes.client.dsl.base.BaseOperation.get(BaseOperation.java:184)

at 
org.apache.spark.scheduler.cluster.k8s.KubernetesClusterSchedulerBackend.init(KubernetesClusterSchedulerBackend.scala:70)

at 
org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager.createSchedulerBackend(KubernetesClusterManager.scala:120)

at 
org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2741)

... 12 more

Caused by: javax.net.ssl.SSLHandshakeException: 
sun.security.validator.ValidatorException: PKIX path building failed: 
sun.security.provider.certpath.SunCertPathBuilderException: unable to find 
valid certification path to requested target

at sun.security.ssl.Alerts.getSSLException(Alerts.java:192)

at sun.security.ssl.SSLSocketImpl.fatal(SSLSocketImpl.java:1959)

at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:302)

at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:296)

at 
sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1514)

at 
sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:216)

at sun.security.ssl.Handshaker.processLoop(Handshaker.java:1026)

at 
sun.security.ssl.Handshaker.process_record(Handshaker.java:961)

at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:1072)

at 
sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1385)

at 
sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1413)

at 
sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1397)

at 
okhttp3.internal.connection.RealConnection.connectTls(RealConnection.java:281)

at 
okhttp3.internal.connection.RealConnection.establishProtocol(RealConnection.java:251)

at 
okhttp3.internal.connection.RealConnection.connect(RealConnection.java:151)

at 
okhttp3.internal.connection.StreamAllocation.findConnection(StreamAllocation.java:195)

at 
okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:121)

at 
okhttp3.internal.connection.StreamAllocation.newStream(StreamAllocation.java:100)

at 
okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:42)

at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)

at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)

at 
okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)

at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)

at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)

at 

Re: Run Python User Defined Functions / code in Spark with Scala Codebase

2018-07-05 Thread Chetan Khatri
Prem sure, Thanks for suggestion.

On Wed, Jul 4, 2018 at 8:38 PM, Prem Sure  wrote:

> try .pipe(.py) on RDD
>
> Thanks,
> Prem
>
> On Wed, Jul 4, 2018 at 7:59 PM, Chetan Khatri  > wrote:
>
>> Can someone please suggest me , thanks
>>
>> On Tue 3 Jul, 2018, 5:28 PM Chetan Khatri, 
>> wrote:
>>
>>> Hello Dear Spark User / Dev,
>>>
>>> I would like to pass Python user defined function to Spark Job developed
>>> using Scala and return value of that function would be returned to DF /
>>> Dataset API.
>>>
>>> Can someone please guide me, which would be best approach to do this.
>>> Python function would be mostly transformation function. Also would like to
>>> pass Java Function as a String to Spark / Scala job and it applies to RDD /
>>> Data Frame and should return RDD / Data Frame.
>>>
>>> Thank you.
>>>
>>>
>>>
>>>
>


Re: How to branch a Stream / have multiple Sinks / do multiple Queries on one Stream

2018-07-05 Thread Amiya Mishra
Hi Chandan/Jürgen,

I had tried through a native code having single input data frame with
multiple sinks as :

Spark provides a method called awaitAnyTermination() in
StreamingQueryManager.scala which provides all the required details to
handle the query processed by spark.By observing documentation of spark with
below points :
-> Wait until any of the queries on the associated SQLContext 
has
terminated since the creation of the context, or since `resetTerminated()`
was called. If any query was terminated
-> If a query has terminated, then subsequent calls to
`awaitAnyTermination()` will either return immediately (if the query was
terminated  by `query.stop()`),or throw the exception immediately (if the
query was terminated with exception). Use `resetTerminated()` to clear past
terminations and wait for new terminations.
-> In the case where multiple queries have terminated since
`resetTermination()` was called, if any query has terminated with exception,
when `awaitAnyTermination()` will throw any of the exception. For correctly
documenting exceptions across multiple queries,users need to  stop all of
them after any of them terminates with exception, and then check the
`query.exception()` for each query. 


val inputdf:DataFrame =
sparkSession.readStream.schema(schema).format("csv").option("delimiter",",").csv("src/main/streamingInput")
query1 =
inputdf.writeStream.option("path","first_output").option("checkpointLocation","checkpointloc").format("csv").start()
query2 =
inputdf.writeStream.option("path","second_output").option("checkpointLocation","checkpoint2").format("csv").start()
sparkSession.streams.awaitAnyTermination()


Now, both "first_output" and "second_output" file write successfully.

Try it out on your site and let me know if you found any limitation.And try
to posting if you found any other way.

Let me correct if i had grammatical mistake.

Thanks
Amiya



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Automatic Json Schema inference using Structured Streaming

2018-07-05 Thread SRK
Hi,

Is there a way that Automatic Json Schema inference can be done using
Structured Streaming?  I do not want to supply a predefined schema and bind
it.

With Spark Kafka Direct I could do spark.read.json(). I see that this is not
supported in Structured Streaming.


Thanks!



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Spark Streaming MEMORY_ONLY] Understanding Dataflow

2018-07-05 Thread Thomas Lavocat
Excerpts from Prem Sure's message of 2018-07-04 19:39:29 +0530:
> Hoping below would help in clearing some..
> executors dont have control to share the data among themselves except
> sharing accumulators via driver's support.
> Its all based on the data locality or remote nature, tasks/stages are
> defined to perform which may result in shuffle.

If I understand correctly :

* Only shuffle data goes through the driver
* The receivers data stays node local until a shuffle occurs

Is that right ?

> On Wed, Jul 4, 2018 at 1:56 PM, thomas lavocat <
> thomas.lavo...@univ-grenoble-alpes.fr> wrote:
> 
> > Hello,
> >
> > I have a question on Spark Dataflow. If I understand correctly, all
> > received data is sent from the executor to the driver of the application
> > prior to task creation.
> >
> > Then the task embeding the data transit from the driver to the executor in
> > order to be processed.
> >
> > As executor cannot exchange data themselves, in a shuffle, data also
> > transit to the driver.
> >
> > Is that correct ?
> >
> > Thomas
> >
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
> >

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



structured streaming: how to keep counter of error records in log running streaming application

2018-07-05 Thread chandan prakash
Hi,
I am writing a structured streaming application, where I process records
post some validation (lets say , not null).
Want to keep a counter of invalid records in the long running streaming
application while other valid records get processed.
How can I achieve it ?

First thought was using LongAccumulator but it seems like it is per batch
and not for the life of the application.

Any other way or workaround to achieve this, please share.
Thanks in advance.


Regards,
-- 
Chandan Prakash


Re: How to branch a Stream / have multiple Sinks / do multiple Queries on one Stream

2018-07-05 Thread chandan prakash
Hi Amiya/Jürgen,
Did you get any lead on this ?
I want to process records post some validation.
Correct records should go in sink1 and incorrect records should go in sink2.
How to achieve this in single stream ?

Regards,
Chandan

On Wed, Jun 13, 2018 at 2:30 PM Amiya Mishra 
wrote:

> Hi Jürgen,
>
> Have you found any solution or workaround for multiple sinks from single
> source as we cannot process multiple sinks at a time ?
>
> As i also has a scenario in ETL where we are using clone component having
> multiple sinks with single input stream dataframe.
>
> Can you keep posting once you have any solution.
>
> Thanks
> Amiya
>
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 
Chandan Prakash