Yarn log aggregation of spark streaming job

2018-09-24 Thread ayushChauhan
By default, YARN aggregates logs after an application completes. But I am
trying aggregate logs for spark streaming job which in theory will run
forever. I have set the property the following properties for log
aggregation and restarted yarn by restarting hadoop-yarn-nodemanager on core
& task nodes and hadoop-yarn-resourcemanager on master node on my emr
cluster. I can view my changes in http://node-ip:8088/conf.

yarn.log-aggregation-enable => true
yarn.log-aggregation.retain-seconds => 172800
yarn.log-aggregation.retain-check-interval-seconds => -1
yarn.nodemanager.log-aggregation.roll-monitoring-interval-seconds => 3600

All the articles and resources have only mentioned to include
yarn.nodemanager.log-aggregation.roll-monitoring-interval-seconds property
and yarn will starting aggregating logs for running jobs. But it is not
working in my case.





--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark Streaming RDD Cleanup too slow

2018-09-05 Thread Prashant Sharma
I have a Spark Streaming job which takes too long to delete temp RDD's. I
collect about 4MM telemetry metrics per minute and do minor aggregations in
the Streaming Job.

I am using Amazon R4 instances.  The Driver RPC call although Async,i
believe, is slow getting the handle for future object  at "askAsync call.
Here  is the Spark code which does the cleanup -
https://github.com/apache/spark/blob/5264164a67df498b73facae207eda12ee133be7d/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala#L125

Any chance anyone else encountered similar issue with their Streaming jobs?
About 20% of our time (~60 secs) is spent in cleaning the temp RDDs.
best,
Prashant


Re: Spark Streaming - Kafka. java.lang.IllegalStateException: This consumer has already been closed.

2018-08-30 Thread Cody Koeninger
You're using an older version of spark, with what looks like a
manually included different version of the kafka-clients jar (1.0)
than what that version of the spark connector was written to depend on
(0.10.0.1), so there's no telling what's going on.

On Wed, Aug 29, 2018 at 3:40 PM, Guillermo Ortiz Fernández
 wrote:
> I can't... do you think that it's a possible bug of this version?? from
> Spark or Kafka?
>
> El mié., 29 ago. 2018 a las 22:28, Cody Koeninger ()
> escribió:
>>
>> Are you able to try a recent version of spark?
>>
>> On Wed, Aug 29, 2018 at 2:10 AM, Guillermo Ortiz Fernández
>>  wrote:
>> > I'm using Spark Streaming 2.0.1 with Kafka 0.10, sometimes I get this
>> > exception and Spark dies.
>> >
>> > I couldn't see any error or problem among the machines, anybody has the
>> > reason about this error?
>> >
>> >
>> > java.lang.IllegalStateException: This consumer has already been closed.
>> > at
>> >
>> > org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1787)
>> > ~[kafka-clients-1.0.0.jar:na]
>> > at
>> >
>> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1091)
>> > ~[kafka-clients-1.0.0.jar:na]
>> > at
>> >
>> > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:169)
>> > ~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2]
>> > at
>> >
>> > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:188)
>> > ~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2]
>> > at
>> >
>> > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:215)
>> > ~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2]
>> > at
>> >
>> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
>> > at
>> >
>> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
>> > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>> > ~[scala-library-2.11.11.jar:na]
>> > at
>> >
>> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
>> > at
>> >
>> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
>> > at
>> >
>> > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
>> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
>> > at
>> >
>> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
>> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
>> > at
>> >
>> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
>> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
>> > at scala.Option.orElse(Option.scala:289)
>> > ~[scala-library-2.11.11.jar:na]
>> > at
>> >
>> > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
>> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
>> > at
>> >
>> > org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
>> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
>> > at
>> >
>> > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
>> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
>> > at
>> >
>> > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
>> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
>> > at
>> >
>> > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>> > ~[scala-library-2.11.11.jar:na]
>> > at
>> >
>> > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>> > ~[scala-library-2.11.11.jar:na]
>> > at
>> >
>> > scala.collection.mutable.ResizableArray$class.foreach(Resi

Re: Spark Streaming - Kafka. java.lang.IllegalStateException: This consumer has already been closed.

2018-08-29 Thread Guillermo Ortiz Fernández
I can't... do you think that it's a possible bug of this version?? from
Spark or Kafka?

El mié., 29 ago. 2018 a las 22:28, Cody Koeninger ()
escribió:

> Are you able to try a recent version of spark?
>
> On Wed, Aug 29, 2018 at 2:10 AM, Guillermo Ortiz Fernández
>  wrote:
> > I'm using Spark Streaming 2.0.1 with Kafka 0.10, sometimes I get this
> > exception and Spark dies.
> >
> > I couldn't see any error or problem among the machines, anybody has the
> > reason about this error?
> >
> >
> > java.lang.IllegalStateException: This consumer has already been closed.
> > at
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1787)
> > ~[kafka-clients-1.0.0.jar:na]
> > at
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1091)
> > ~[kafka-clients-1.0.0.jar:na]
> > at
> >
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:169)
> > ~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2]
> > at
> >
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:188)
> > ~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2]
> > at
> >
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:215)
> > ~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2]
> > at
> >
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at
> >
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> > ~[scala-library-2.11.11.jar:na]
> > at
> >
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at
> >
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at
> >
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at
> >
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at
> >
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at scala.Option.orElse(Option.scala:289)
> ~[scala-library-2.11.11.jar:na]
> > at
> >
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at
> >
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at
> >
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at
> >
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at
> >
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> > ~[scala-library-2.11.11.jar:na]
> > at
> >
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> > ~[scala-library-2.11.11.jar:na]
> > at
> >
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> > ~[scala-library-2.11.11.jar:na]
> > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> > ~[scala-library-2.11.11.jar:na]
> > at
> > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
> > ~[scala-library-2.11.11.jar:na]
> > at
> scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
> > ~[scala-library-2.11.11.jar:na]
> > at
> >
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at
> >
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.

Re: Spark Streaming - Kafka. java.lang.IllegalStateException: This consumer has already been closed.

2018-08-29 Thread Cody Koeninger
Are you able to try a recent version of spark?

On Wed, Aug 29, 2018 at 2:10 AM, Guillermo Ortiz Fernández
 wrote:
> I'm using Spark Streaming 2.0.1 with Kafka 0.10, sometimes I get this
> exception and Spark dies.
>
> I couldn't see any error or problem among the machines, anybody has the
> reason about this error?
>
>
> java.lang.IllegalStateException: This consumer has already been closed.
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1787)
> ~[kafka-clients-1.0.0.jar:na]
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1091)
> ~[kafka-clients-1.0.0.jar:na]
> at
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:169)
> ~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2]
> at
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:188)
> ~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2]
> at
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:215)
> ~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2]
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> ~[scala-library-2.11.11.jar:na]
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at scala.Option.orElse(Option.scala:289) ~[scala-library-2.11.11.jar:na]
> at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> ~[scala-library-2.11.11.jar:na]
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> ~[scala-library-2.11.11.jar:na]
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> ~[scala-library-2.11.11.jar:na]
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> ~[scala-library-2.11.11.jar:na]
> at
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
> ~[scala-library-2.11.11.jar:na]
> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
> ~[scala-library-2.11.11.jar:na]
> at
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at scala.util.Try$.apply(Try.scala:192) ~[scala-library-2.11.11.jar:na]
> at
> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at
> org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.

Spark Streaming - Kafka. java.lang.IllegalStateException: This consumer has already been closed.

2018-08-29 Thread Guillermo Ortiz Fernández
I'm using Spark Streaming 2.0.1 with Kafka 0.10, sometimes I get this
exception and Spark dies.

I couldn't see any error or problem among the machines, anybody has the
reason about this error?


java.lang.IllegalStateException: This consumer has already been closed.
at
org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1787)
~[kafka-clients-1.0.0.jar:na]
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1091)
~[kafka-clients-1.0.0.jar:na]
at
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:169)
~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2]
at
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:188)
~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2]
at
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:215)
~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2]
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
~[scala-library-2.11.11.jar:na]
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at scala.Option.orElse(Option.scala:289) ~[scala-library-2.11.11.jar:na]
at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
~[scala-library-2.11.11.jar:na]
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
~[scala-library-2.11.11.jar:na]
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
~[scala-library-2.11.11.jar:na]
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
~[scala-library-2.11.11.jar:na]
at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
~[scala-library-2.11.11.jar:na]
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
~[scala-library-2.11.11.jar:na]
at
org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at scala.util.Try$.apply(Try.scala:192) ~[scala-library-2.11.11.jar:na]
at
org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at 
org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
~[spark-core_2.11-2.0.2.15.jar:2.0.2.15]


Re: [Spark Streaming] [ML]: Exception handling for the transform method of Spark ML pipeline model

2018-08-17 Thread sudododo
Hi,

Any help on this?

Thanks,



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[Spark Streaming] [ML]: Exception handling for the transform method of Spark ML pipeline model

2018-08-16 Thread sudododo
Hi,

I'm implementing a Spark Streaming + ML application. The data is coming in a
Kafka topic as json format. The Spark Kafka connector reads the data from
the Kafka topic as DStream. After several preprocessing steps, the input
DStream is transformed to a feature DStream which is fed into Spark ML
pipeline model. The code sample explains how the feature DStream interacts
with the pipeline model.

prediction_stream = feature_stream.transform(lambda rdd: predict_rdd(rdd,
pipeline_model)

def predict_rdd(rdd, pipeline_model):
if(rdd is not None) and (not rdd.isEmpty()):
try:
df = rdd.toDF()
predictions = pipeline_model.transform(df)
return predictions.rdd
except Exception as e:
print("Unable to make predictions")
return None
 else:
  return None

Here comes the problem. If the pipeline_model.transform(df) is failed due to
some data issues in some rows of df, the try...except block won't be able to
catch the exception since the exception is thrown in executors. As a result,
the exception is bubbled up to Spark and the streaming application is
terminated.

I want the exception to be caught in some way that the streaming application
won't be terminated and keep processing incoming data. Is it possible?

I know one solution could be doing more thorough data validation in
preprocessing step. However some sort of error handling should be put in
place for the transform method of pipeline model just in case any unexpected
things happen.


Thanks,



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How to convert Spark Streaming to Static Dataframe on the fly and pass it to a ML Model as batch

2018-08-14 Thread Gourav Sengupta
Hi,

or you could just use the structured streaming
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html


Regards,
Gourav Sengupta

On Tue, Aug 14, 2018 at 10:51 AM, Gerard Maas  wrote:

> Hi Aakash,
>
> In Spark Streaming, forEachRDD provides you access to the data in
> each micro batch.
> You can transform that RDD into a DataFrame and implement the flow you
> describe.
>
> eg.:
>
> var historyRDD:RDD[mytype] = sparkContext.emptyRDD
>
> // create Kafka Dstream ...
>
> dstream.foreachRDD{ rdd =>
>   val allData = historyRDD union rdd
>   val df = allData.toDF   // requires the RDD to be of some structured
> type. i.e. a case class
>   // do something with the dataframe df
> historyRDD = allData  // this needs checkpointing
> }
> Depending on the volume of data you're dealing with, it might not be
> possible to hold all data in memory.
> Checkpoint of the historyRDD is mandatory to break up the growing lineage
> (union will keep a reference to the previous RDDs and at some point, things
> will blow up)
> So, while this trick might keep data within the Spark boundaries, you
> still need resilient storage to write the checkpoints in order to implement
> a reliable streaming job.
>
> As you are using Kafka, another alternative would be to write the
> transformed data to Kafka and have the training job consume that topic,
> replaying data from the start.
> Confluent has some good resources on how to use "kafka as a storage"
>
> I  hope this helps.
>
> kr, Gerard.
>
> PS: I'm also not sure why you are initially writing the files to Kafka. It
> would be easier to read the files directly from Spark Streaming or
> Structured Streaming.
>
>
>
>
>
> On Tue, Aug 14, 2018 at 9:31 AM Aakash Basu 
> wrote:
>
>> Hi all,
>>
>> The requirement is, to process file using Spark Streaming fed from Kafka
>> Topic and once all the transformations are done, make it a batch of static
>> dataframe and pass it into a Spark ML Model tuning.
>>
>> As of now, I had been doing it in the below fashion -
>>
>> 1) Read the file using Kafka
>> 2) Consume it in Spark using a streaming dataframe
>> 3) Run spark transformation jobs on streaming data
>> 4) Append and write on HDFS.
>> 5) Read the transformed file as batch in Spark
>> 6) Run Spark ML Model
>>
>> But, the requirement is to avoid use of HDFS as it may not be installed
>> in certain clusters, so, we've to avoid the disk I/O and do it on the fly
>> from Kafka to append in a spark static DF and hence pass that DF to the ML
>> Model.
>>
>> How to go about it?
>>
>> Thanks,
>> Aakash.
>>
>


Sending data from ZeroMQ to Spark Streaming API with Python

2018-08-14 Thread oreogundipe
Hi! I'm working on a project and I'm trying to find out if I can pass data
from my zeroMQ straight into python's streaming API. I saw some links but I
didn't see anything concrete as to how to use it with python. Can anybody
please point me in the right direction?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How to convert Spark Streaming to Static Dataframe on the fly and pass it to a ML Model as batch

2018-08-14 Thread Gerard Maas
Hi Aakash,

In Spark Streaming, forEachRDD provides you access to the data in
each micro batch.
You can transform that RDD into a DataFrame and implement the flow you
describe.

eg.:

var historyRDD:RDD[mytype] = sparkContext.emptyRDD

// create Kafka Dstream ...

dstream.foreachRDD{ rdd =>
  val allData = historyRDD union rdd
  val df = allData.toDF   // requires the RDD to be of some structured
type. i.e. a case class
  // do something with the dataframe df
historyRDD = allData  // this needs checkpointing
}
Depending on the volume of data you're dealing with, it might not be
possible to hold all data in memory.
Checkpoint of the historyRDD is mandatory to break up the growing lineage
(union will keep a reference to the previous RDDs and at some point, things
will blow up)
So, while this trick might keep data within the Spark boundaries, you still
need resilient storage to write the checkpoints in order to implement a
reliable streaming job.

As you are using Kafka, another alternative would be to write the
transformed data to Kafka and have the training job consume that topic,
replaying data from the start.
Confluent has some good resources on how to use "kafka as a storage"

I  hope this helps.

kr, Gerard.

PS: I'm also not sure why you are initially writing the files to Kafka. It
would be easier to read the files directly from Spark Streaming or
Structured Streaming.





On Tue, Aug 14, 2018 at 9:31 AM Aakash Basu 
wrote:

> Hi all,
>
> The requirement is, to process file using Spark Streaming fed from Kafka
> Topic and once all the transformations are done, make it a batch of static
> dataframe and pass it into a Spark ML Model tuning.
>
> As of now, I had been doing it in the below fashion -
>
> 1) Read the file using Kafka
> 2) Consume it in Spark using a streaming dataframe
> 3) Run spark transformation jobs on streaming data
> 4) Append and write on HDFS.
> 5) Read the transformed file as batch in Spark
> 6) Run Spark ML Model
>
> But, the requirement is to avoid use of HDFS as it may not be installed in
> certain clusters, so, we've to avoid the disk I/O and do it on the fly from
> Kafka to append in a spark static DF and hence pass that DF to the ML Model.
>
> How to go about it?
>
> Thanks,
> Aakash.
>


How to convert Spark Streaming to Static Dataframe on the fly and pass it to a ML Model as batch

2018-08-14 Thread Aakash Basu
Hi all,

The requirement is, to process file using Spark Streaming fed from Kafka
Topic and once all the transformations are done, make it a batch of static
dataframe and pass it into a Spark ML Model tuning.

As of now, I had been doing it in the below fashion -

1) Read the file using Kafka
2) Consume it in Spark using a streaming dataframe
3) Run spark transformation jobs on streaming data
4) Append and write on HDFS.
5) Read the transformed file as batch in Spark
6) Run Spark ML Model

But, the requirement is to avoid use of HDFS as it may not be installed in
certain clusters, so, we've to avoid the disk I/O and do it on the fly from
Kafka to append in a spark static DF and hence pass that DF to the ML Model.

How to go about it?

Thanks,
Aakash.


How does mapPartitions function work in Spark streaming on DStreams?

2018-08-09 Thread zakhavan
Hello,

I am running a spark streaming program on a dataset which is a sequence of
numbers in a text file format. I read the text file and load it into a Kafka
topic and then run the Spark streaming program on the DStream and finally
write the result into an output text file. But I'm getting almost totally
different result compared to run the program without Spark streaming. 

I'm using maPartitions and it seems it shuffles the data and messes it up. 

Here is my code in Spark streaming and using Kafka:

from __future__ import print_function

import sys
from operator import add

from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import numpy as np
from collections import deque
import matplotlib.pyplot as plt
import pandas as pd

#---
def classic_sta_lta_py(a):
nsta = 2
nlta = 30
#


print("a=", a)

sta = np.cumsum(a ** 2)
print("sta1=", sta)
#   sta = np.cumsum(a * a, dtype=float)
#   print("{}. sta array is: ".format(sta))


# Convert to float
sta = np.require(sta, dtype=np.float)
print("sta2=", sta)

# Copy for LTA
lta = sta.copy()
print("lta=", lta)

# Compute the STA and the LTA
sta[nsta:] = sta[nsta:] - sta[:-nsta]
sta /= nsta
lta[nlta:] = lta[nlta:] - lta[:-nlta]
lta /= nlta

# Pad zeros
sta[:nlta - 1] = 0

# Avoid division by zero by setting zero values to tiny float
dtiny = np.finfo(0.0).tiny
idx = lta < dtiny
lta[idx] = dtiny

return sta / lta
#---

def trigger_onset(charfct):
"""
Calculate trigger on and off times.

Given thres1 and thres2 calculate trigger on and off times from
characteristic function.

This method is written in pure Python and gets slow as soon as there
are more then 1e6 triggerings ("on" AND "off") in charfct --- normally
this does not happen.

:type charfct: NumPy :class:`~numpy.ndarray`
:param charfct: Characteristic function of e.g. STA/LTA trigger
:type thres1: float
:param thres1: Value above which trigger (of characteristic function)
   is activated (higher threshold)
:type thres2: float
:param thres2: Value below which trigger (of characteristic function)
is deactivated (lower threshold)
:type max_len: int
:param max_len: Maximum length of triggered event in samples. A new
event will be triggered as soon as the signal reaches
again above thres1.
:type max_len_delete: bool
:param max_len_delete: Do not write events longer than max_len into
   report file.
:rtype: List
:return: Nested List of trigger on and of times in samples
"""
# 1) find indices of samples greater than threshold
# 2) calculate trigger "of" times by the gap in trigger indices
#above the threshold i.e. the difference of two following indices
#in ind is greater than 1
# 3) in principle the same as for "of" just add one to the index to get
#start times, this operation is not supported on the compact
#syntax
# 4) as long as there is a on time greater than the actual of time find
#trigger on states which are greater than last of state an the
#corresponding of state which is greater than current on state
# 5) if the signal stays above thres2 longer than max_len an event
#is triggered and following a new event can be triggered as soon as
#the signal is above thres1
thres1 = 4
thres2 = 2
max_len = 9e99
max_len_delete = False
#charfct = []
# for x in iterator:
# print(x)
# charfct.append(x)
ind1 = np.where(charfct > thres1)[0]
if len(ind1) == 0:
return []
ind2 = np.where(charfct > thres2)[0]
#
on = deque([ind1[0]])
of = deque([-1])
# determine the indices where charfct falls below off-threshold
ind2_ = np.empty_like(ind2, dtype=bool)
ind2_[:-1] = np.diff(ind2) > 1
# last occurence is missed by the diff, add it manually
ind2_[-1] = True
of.extend(ind2[ind2_].tolist())
on.extend(ind1[np.where(np.diff(ind1) > 1)[0] + 1].tolist())
# include last pick if trigger is on or drop it
if max_len_delete:
# drop it
of.extend([1e99])
on.extend([on[-1]])
else:
# include it
of.extend([ind2[-1]])
#
pick = []
while on[-1] > of[0]:
while on[0] <= of[0]:
on.popleft()
while of[0] < on[0]:
of.popleft()
if of[0] - on[0] > max_len:
if max_len_delete:
on.popleft()
continue
of.appendleft(on[0] + max_len)
pick.append([on[0], of[0]

Re: How to do PCA with Spark Streaming Dataframe?

2018-07-31 Thread Aakash Basu
FYI

The relevant StackOverflow query on the same -
https://stackoverflow.com/questions/51610482/how-to-do-pca-with-spark-streaming-dataframe

On Tue, Jul 31, 2018 at 3:18 PM, Aakash Basu 
wrote:

> Hi,
>
> Just curious to know, how can we run a Principal Component Analysis on
> streaming data in distributed mode? If we can, is it mathematically valid
> enough?
>
> Have anyone done that before? Can you guys share your experience over it?
> Is there any API Spark provides to do the same on Spark Streaming mode?
>
> Thanks,
> Aakash.
>


How to do PCA with Spark Streaming Dataframe?

2018-07-31 Thread Aakash Basu
Hi,

Just curious to know, how can we run a Principal Component Analysis on
streaming data in distributed mode? If we can, is it mathematically valid
enough?

Have anyone done that before? Can you guys share your experience over it?
Is there any API Spark provides to do the same on Spark Streaming mode?

Thanks,
Aakash.


Using Spark Streaming for analyzing changing data

2018-07-30 Thread oripwk


We have a use case where there's a stream of events while every event has an
ID and its current state with a timestamp:

…
111,ready,1532949947
111,offline,1532949955
111,ongoing,1532949955
111,offline,1532949973
333,offline,1532949981
333,ongoing,1532949987
…

We want to ask questions about the current state of the *whole dataset*,
from the beginning of time, such as:
  "how many items are now in ongoing state"

(but bear in mind that there are more complicated questions, and all of them
are asking about the _current_ state of the dataset, from the beginning of
time)

I haven't found any simple, performant way of doing it.

The ways I've found are:
1. Using mapGroupsWithState, where I groupByKey on the ID, and update the
state always for the latest event by timestamp
2. Using groupByKey on the ID, and leaving only the matched event whose
timestamp is the latest

Both methods are not good because the first one involves state which means
checkpointing, memory, etc., and the second involves shuffling and sorting.

We will have a lot of such queries in order to populate a real-time
dashboard.

I wonder, as a general question, what is the correct way to process this
type of data in Spark Streaming?




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Question of spark streaming

2018-07-27 Thread Arun Mahadevan
“activityQuery.awaitTermination()” is a blocking call.

 

 You can just skip this line and run other commands in the same shell to query 
the stream.

 

Running the query from a different shell won’t help since the memory sink where 
the results are store is not shared between the two shells.

 

Thanks,

Arun

 

From: utkarsh rathor 
Date: Friday, July 27, 2018 at 5:15 AM
To: "user@spark.apache.org" 
Subject: Question of spark streaming

 

 

I am following the book Spark the Definitive Guide The following code is 
executed locally using spark-shell

Procedure: Started the spark-shell without any other options
val static = 
spark.read.json("/part-00079-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json")
 
val dataSchema = static.schema
 
val streaming = spark.readStream.schema(dataSchema) 
.option("maxFilesPerTrigger",1).json("/part-00079-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json")
 
val activityCounts = streaming.groupBy("gt").count()
 
val activityQuery  = 
activityCounts.writeStream.queryName("activity_counts").format("memory").outputMode("complete").start()
 
activityQuery.awaitTermination()
The Books says that "After this code is executed the streaming computation will 
have started in the background"  "Now that this stream is running , we can 
experiment with the result by querying"

MY OBSERVATION:

When this code is executed it does not frees the shell for me to type in the 
commands such asspark.streams.active

Hence I cannot query this stream

My resarch

I tried to open a new spark-shell but querying in that shell does not returns 
any results. Are the streams obtained from this shell accessible from other 
another instance of the shell.

I want the table in memory so that I can use the to query using command
for( i <- 1 to 5)
{
spark.sql("SELECT * FROM activity_counts").show()
Thread.sleep(1000)
}



Question of spark streaming

2018-07-27 Thread utkarsh rathor
I am following the book *Spark the Definitive Guide* The following
code is *executed
locally using spark-shell*

Procedure: Started the spark-shell without any other options

val static = 
spark.read.json("/part-00079-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json")

val dataSchema = static.schema

val streaming = spark.readStream.schema(dataSchema)
.option("maxFilesPerTrigger",1).json("/part-00079-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json")

val activityCounts = streaming.groupBy("gt").count()

val activityQuery  =
activityCounts.writeStream.queryName("activity_counts").format("memory").outputMode("complete").start()

activityQuery.awaitTermination()

The Books says that *"After this code is executed the streaming computation
will have started in the background"  "Now that this stream is running
, we can experiment with the result by querying*"

*MY OBSERVATION:*

When this code is executed it does not frees the shell for me to type in
the commands such as*spark.streams.active*

Hence I cannot query this stream

*My resarch*

I tried to open a new spark-shell but querying in that shell does not
returns any results. Are the streams obtained from this shell accessible
from other another instance of the shell.

I want the table in memory so that I can use the to query using command

for( i <- 1 to 5)
{
spark.sql("SELECT * FROM activity_counts").show()
Thread.sleep(1000)
}


How dose spark streaming program call python file

2018-07-25 Thread 康逸之

I am trying to build a real-time system with spark (written with scala), but 
here are some algorithm file written in python. How can i call the algorithm 
file ?

Any idea how to let it work?



***UNCHECKED*** How dose spark streaming program (written with scala)call python file

2018-07-25 Thread 康逸之

I am trying to build a real-time system with spark (written with scala), but 
here are some algorithm file written in python. How can i call the algorithm 
file ?

Any idea how to let it work?



Spark streaming connecting to two kafka clusters

2018-07-17 Thread Sathi Chowdhury
Hi,My question is about ability to integrate spark streaming with multiple 
clusters.Is it a supported use case. An example of that is that two topics 
owned by different group and they have their own kakka infra .Can i have two 
dataframes as a result of spark.readstream listening to different kafka 
clueters in the same spark screaming job?Any one has solved this usecase 
before? 

Thanks.Sathi

Re: Stopping a Spark Streaming Context gracefully

2018-07-15 Thread Dhaval Modi
+1

Regards,
Dhaval Modi
dhavalmod...@gmail.com

On 8 November 2017 at 00:06, Bryan Jeffrey  wrote:

> Hello.
>
> I am running Spark 2.1, Scala 2.11.  We're running several Spark streaming
> jobs.  In some cases we restart these jobs on an occasional basis.  We have
> code that looks like the following:
>
> logger.info("Starting the streaming context!")
> ssc.start()
> logger.info("Waiting for termination!")
> Option(config.getInt(Parameters.RestartMinutes)).getOrElse(0) match {
>   case restartMinutes: Int if restartMinutes > 0 =>
> logger.info(s"Waiting for ${restartMinutes} before terminating job")
> ssc.awaitTerminationOrTimeout(restartMinutes * 
> DateUtils.millisecondsPerMinute)
>   case _ => ssc.awaitTermination()
> }
> logger.info("Calling 'stop'")
> ssc.stop(stopSparkContext = true, stopGracefully = true)
>
>
> In several cases we've observed jobs where we've called 'stop' not
> stopping.  I went and wrote a simple job that reads from Kafka and does
> nothing (prints a count of data).  After several minutes it simply calls
> 'ssc.stop(true, true)'.  In some cases this will stop the context.  In
> others it will not stop the context.  If we call 'stop' several times over
> an interval one of them eventually succeeds.
>
> It looks like this is a bug.  I looked in Jira and did not see an open
> issue.  Is this a  known problem?  If not I'll open a bug.
>
> Regards,
>
> Bryan Jeffrey
>
>
>


Run STA/LTA python function using spark streaming: java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute

2018-07-10 Thread zakhavan
Hello,

I'm trying to run the sta/lta python code which I got it from obspy website
using spark streaming and plot the events but I keep getting the following
error!

"java.lang.IllegalArgumentException: requirement failed: No output
operations registered, so nothing to execute"

Here is the code:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# ---
# Filename: trigger.py
#  Purpose: Python trigger/picker routines for seismology.
#   Author: Moritz Beyreuther, Tobias Megies
#Email: moritz.beyreut...@geophysik.uni-muenchen.de
#
# Copyright (C) 2008-2012 Moritz Beyreuther, Tobias Megies
# ---

from __future__ import (absolute_import, division, print_function,
unicode_literals)
from future.builtins import *  # NOQA

from collections import deque
import ctypes as C
import warnings

import numpy as np

from obspy import UTCDateTime
from obspy.signal.cross_correlation import templates_max_similarity
from obspy.signal.headers import clibsignal, head_stalta_t
from numpy import genfromtxt
import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext





def classic_sta_lta_py(a, nsta, nlta):
"""
Computes the standard STA/LTA from a given input array a. The length of
the STA is given by nsta in samples, respectively is the length of the
LTA given by nlta in samples. Written in Python.

.. note::

There exists a faster version of this trigger wrapped in C
called :func:`~obspy.signal.trigger.classic_sta_lta` in this module!

:type a: NumPy :class:`~numpy.ndarray`
:param a: Seismic Trace
:type nsta: int
:param nsta: Length of short time average window in samples
:type nlta: int
:param nlta: Length of long time average window in samples
:rtype: NumPy :class:`~numpy.ndarray`
:return: Characteristic function of classic STA/LTA
"""
# The cumulative sum can be exploited to calculate a moving average (the
# cumsum function is quite efficient)
sta = np.cumsum(a ** 2)

# Convert to float
sta = np.require(sta, dtype=np.float)

# Copy for LTA
lta = sta.copy()

# Compute the STA and the LTA
sta[nsta:] = sta[nsta:] - sta[:-nsta]
sta /= nsta
lta[nlta:] = lta[nlta:] - lta[:-nlta]
lta /= nlta

# Pad zeros
sta[:nlta - 1] = 0

# Avoid division by zero by setting zero values to tiny float
dtiny = np.finfo(0.0).tiny
idx = lta < dtiny
lta[idx] = dtiny

return sta / lta






def trigger_onset(charfct, thres1, thres2, max_len=9e99,
max_len_delete=False):
"""
Calculate trigger on and off times.

Given thres1 and thres2 calculate trigger on and off times from
characteristic function.

This method is written in pure Python and gets slow as soon as there
are more then 1e6 triggerings ("on" AND "off") in charfct --- normally
this does not happen.

:type charfct: NumPy :class:`~numpy.ndarray`
:param charfct: Characteristic function of e.g. STA/LTA trigger
:type thres1: float
:param thres1: Value above which trigger (of characteristic function)
   is activated (higher threshold)
:type thres2: float
:param thres2: Value below which trigger (of characteristic function)
is deactivated (lower threshold)
:type max_len: int
:param max_len: Maximum length of triggered event in samples. A new
event will be triggered as soon as the signal reaches
again above thres1.
:type max_len_delete: bool
:param max_len_delete: Do not write events longer than max_len into
   report file.
:rtype: List
:return: Nested List of trigger on and of times in samples
"""
# 1) find indices of samples greater than threshold
# 2) calculate trigger "of" times by the gap in trigger indices
#above the threshold i.e. the difference of two following indices
#in ind is greater than 1
# 3) in principle the same as for "of" just add one to the index to get
#start times, this operation is not supported on the compact
#syntax
# 4) as long as there is a on time greater than the actual of time find
#trigger on states which are greater than last of state an the
#corresponding of state which is greater than current on state
# 5) if the signal stays above thres2 longer than max_len an event
#is triggered and following a new event can be triggered as soon as
#the signal is above thres1
ind1 = np.where(charfct > thres1)[0]
if len(ind1) == 0:
return []
ind2 = np.where(charfct > thres2)[0]
#
on = deque([ind1[0]])
of = deque([-1])
# determine the indices where char

Re: [Spark Streaming MEMORY_ONLY] Understanding Dataflow

2018-07-05 Thread Thomas Lavocat
Excerpts from Prem Sure's message of 2018-07-04 19:39:29 +0530:
> Hoping below would help in clearing some..
> executors dont have control to share the data among themselves except
> sharing accumulators via driver's support.
> Its all based on the data locality or remote nature, tasks/stages are
> defined to perform which may result in shuffle.

If I understand correctly :

* Only shuffle data goes through the driver
* The receivers data stays node local until a shuffle occurs

Is that right ?

> On Wed, Jul 4, 2018 at 1:56 PM, thomas lavocat <
> thomas.lavo...@univ-grenoble-alpes.fr> wrote:
> 
> > Hello,
> >
> > I have a question on Spark Dataflow. If I understand correctly, all
> > received data is sent from the executor to the driver of the application
> > prior to task creation.
> >
> > Then the task embeding the data transit from the driver to the executor in
> > order to be processed.
> >
> > As executor cannot exchange data themselves, in a shuffle, data also
> > transit to the driver.
> >
> > Is that correct ?
> >
> > Thomas
> >
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
> >

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Spark Streaming MEMORY_ONLY] Understanding Dataflow

2018-07-04 Thread Prem Sure
Hoping below would help in clearing some..
executors dont have control to share the data among themselves except
sharing accumulators via driver's support.
Its all based on the data locality or remote nature, tasks/stages are
defined to perform which may result in shuffle.

On Wed, Jul 4, 2018 at 1:56 PM, thomas lavocat <
thomas.lavo...@univ-grenoble-alpes.fr> wrote:

> Hello,
>
> I have a question on Spark Dataflow. If I understand correctly, all
> received data is sent from the executor to the driver of the application
> prior to task creation.
>
> Then the task embeding the data transit from the driver to the executor in
> order to be processed.
>
> As executor cannot exchange data themselves, in a shuffle, data also
> transit to the driver.
>
> Is that correct ?
>
> Thomas
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


[Spark Streaming MEMORY_ONLY] Understanding Dataflow

2018-07-04 Thread thomas lavocat

Hello,

I have a question on Spark Dataflow. If I understand correctly, all 
received data is sent from the executor to the driver of the application 
prior to task creation.


Then the task embeding the data transit from the driver to the executor 
in order to be processed.


As executor cannot exchange data themselves, in a shuffle, data also 
transit to the driver.


Is that correct ?

Thomas


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



union of multiple twitter streams [spark-streaming-twitter_2.11]

2018-07-02 Thread Imran Rajjad
Hello,

Has anybody tried to union two streams of Twitter Statues? I am
instantiating two twitter streams through two different set of credentials
and passing them through a union function, but the console does not show
any activity neither there are any errors.


--static function that returns JavaReceiverInputDStream--



public static JavaReceiverInputDStream
getTwitterStream(JavaStreamingContext spark, String consumerKey, String
consumerSecret,String accessToken, String accessTokenSecret,String[]
filter) {
  // Enable Oauth
  ConfigurationBuilder cb = new ConfigurationBuilder();
  cb.setDebugEnabled(false)
.setOAuthConsumerKey(consumerKey).setOAuthConsumerSecret(consumerSecret)

.setOAuthAccessToken(accessToken).setOAuthAccessTokenSecret(accessTokenSecret)
.setJSONStoreEnabled(true);
  TwitterFactory tf = new TwitterFactory(cb.build());
  Twitter twitter = tf.getInstance();

  // Create stream
  return TwitterUtils.createStream(spark,
twitter.getAuthorization(),filter);
 }
---trying to union two twitter streams---

JavaStreamingContext jssc = new JavaStreamingContext(conf,
Durations.minutes(5));

jssc.sparkContext().setLogLevel("ERROR");


JavaReceiverInputDStream twitterStreamByHashtag =
TwitterUtil.getTwitterStream(jssc, consumerKey1, consumerSecret1,
accessToken1, accessTokenSecret1,new String[]{"#Twitter"});
  // JavaReceiverInputDStream twitterStreamByUser =
TwitterUtil.getTwitterStream(jssc, consumerKey2, consumerSecret2,
accessToken2, accessTokenSecret2,new String[]{"@Twitter"});


JavaDStream statuses = twitterStreamByHashtag
.union(twitterStreamByUser)
.map(s->{return s.getText();});


regards,
Imran

-- 
I.R


Spark Streaming PID rate controller minRate default value

2018-06-29 Thread faxianzhao
Hi, there

I think you should set "spark.streaming.backpressure.pid.minRate" as "no
set" like "spark.streaming.backpressure.initialRate".

The default value 100 is not good for my business. 

It's better to explain it more detail in document, and let user make
decision by himself like "spark.streaming.backpressure.initialRate".






--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: spark 2.3.1 with kafka spark-streaming-kafka-0-10 (java.lang.AbstractMethodError)

2018-06-28 Thread Peter Liu
Hello there,

I just upgraded to spark 2.3.1 from spark 2.2.1, ran my streaming workload
and got the error (java.lang.AbstractMethodError) never seen before; check
the error stack attached in (a) bellow.

anyone knows if  spark 2.3.1 works well with kafka
spark-streaming-kafka-0-10?

this link spark kafka integration page doesn't say anything about any
limitation:
https://spark.apache.org/docs/2.3.1/streaming-kafka-integration.html

but this discussion seems to say there is indeed an issue when upgrading to
spark 2.3.1:
https://stackoverflow.com/questions/49180931/abstractmethoderror-creating-kafka-stream

i also rebuilt the workload with some spark 2.3.1 jars (see (b) below). it
doesn't seem to help.

Would be great if anyone could kindly share any insights here.

Thanks!

Peter

(a) the exception
Exception in thread "stream execution thread for [id =
5adae836-268a-4ebf-adc4-e3cc9fbe5acf, runId =
70e78d5c-665e-4c6f-a0cc-41a56e488e30]" java.lang.AbstractMethodError
at
org.apache.spark.internal.Logging$class.initializeLogIfNecessary(Logging.scala:99)
at
org.apache.spark.sql.kafka010.KafkaSourceProvider$.initializeLogIfNecessary(KafkaSourceProvider.scala:369)
at org.apache.spark.internal.Logging$class.log(Logging.scala:46)
at
org.apache.spark.sql.kafka010.KafkaSourceProvider$.log(KafkaSourceProvider.scala:369)
at
org.apache.spark.internal.Logging$class.logDebug(Logging.scala:58)
at
org.apache.spark.sql.kafka010.KafkaSourceProvider$.logDebug(KafkaSourceProvider.scala:369)
at
org.apache.spark.sql.kafka010.KafkaSourceProvider$ConfigUpdater.set(KafkaSourceProvider.scala:439)
at
org.apache.spark.sql.kafka010.KafkaSourceProvider$.kafkaParamsForDriver(KafkaSourceProvider.scala:394)
at
org.apache.spark.sql.kafka010.KafkaSourceProvider.createSource(KafkaSourceProvider.scala:90)
at
org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:277)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1$$anonfun$applyOrElse$1.apply(MicroBatchExecution.scala:80)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1$$anonfun$applyOrElse$1.apply(MicroBatchExecution.scala:77)
at
scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:194)
at
scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:80)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:77)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:75)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:75)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61)
at org.apache.spark.sql.execution.streaming.StreamExecution.org
$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:265)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)

(b)* the build script update:*

[pgl@datanode20 SparkStreamingBenchmark-RemoteConsumer-Spk231]$ diff
build.sbt spk211-build.sbt.original
10,11c10,11
< libraryDependencies += "org.apache.spark" % "spark-sql_2.11" %* "2.3.1"*
< libraryDependencies += "org.apache.spark" % "spark-core_2.11" %* "2.3.1"*
---
> libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % "2.2.1"
> libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.2.1"
[pgl@datanode20 SparkStreamingBenchmark-RemoteConsumer-Spk231]$


Re: [Spark Streaming] Spark Streaming with S3 vs Kinesis

2018-06-28 Thread Farshid Zavareh
Thanks.

A workaround I can think of is to rename/move the objects which have been
processed to a different prefix (which is not monitored), But with
StreamingContext. textFileStream method there doesn't seem to be a way to
know where each record is coming from. Is there another way to do this?

On Wed, Jun 27, 2018 at 12:26 AM Steve Loughran 
wrote:

>
> On 25 Jun 2018, at 23:59, Farshid Zavareh  wrote:
>
> I'm writing a Spark Streaming application where the input data is put into
> an S3 bucket in small batches (using Database Migration Service - DMS). The
> Spark application is the only consumer. I'm considering two possible
> architectures:
>
> Have Spark Streaming watch an S3 prefix and pick up new objects as they
> come in
> Stream data from S3 to a Kinesis stream (through a Lambda function
> triggered as new S3 objects are created by DMS) and use the stream as input
> for the Spark application.
> While the second solution will work, the first solution is simpler. But
> are there any pitfalls? Looking at this guide, I'm concerned about two
> specific points:
>
> > *The more files under a directory, the longer it will take to scan for
> changes — even if no files have been modified.*
>
> We will be keeping the S3 data indefinitely. So the number of objects
> under the prefix being monitored is going to increase very quickly.
>
>
>
> Theres a slightly-more-optimised streaming source for cloud streams here
>
>
> https://github.com/hortonworks-spark/cloud-integration/blob/master/spark-cloud-integration/src/main/scala/org/apache/spark/streaming/hortonworks/CloudInputDStream.scala
>
>
> Even so, the cost of scanning S3 is one LIST request per 5000 objects;
> I'll leave it to you to work out how many there will be in your application
> —and how much it will cost. And of course, the more LIST calls tehre are,
> the longer things take, the bigger your window needs to be.
>
>
> > *“Full” Filesystems such as HDFS tend to set the modification time on
> their files as soon as the output stream is created. When a file is opened,
> even before data has been completely written, it may be included in the
> DStream - after which updates to the file within the same window will be
> ignored. That is: changes may be missed, and data omitted from the stream.*
>
> I'm not sure if this applies to S3, since to my understanding objects are
> created atomically and cannot be updated afterwards as is the case with
> ordinary files (unless deleted and recreated, which I don't believe DMS
> does)
>
>
> Objects written to S3 are't visible until the upload completes, in an
> atomic operation. You can write in place and not worry.
>
> The timestamp on S3 artifacts comes from the PUT tim. On multipart uploads
> of many MB/many GB uploads, thats when the first post to initiate the MPU
> is kicked off. So if the upload starts in time window t1 and completed in
> window t2, the object won't be visible until t2, but the timestamp will be
> of t1. Bear that  in mind.
>
> The lambda callback probably does have better scalability and resilience;
>  not tried it myself.
>
>
>
>
>
> Thanks for any help!
>
>
>


Re: [Spark Streaming] Spark Streaming with S3 vs Kinesis

2018-06-26 Thread Steve Loughran


On 25 Jun 2018, at 23:59, Farshid Zavareh 
mailto:fhzava...@gmail.com>> wrote:

I'm writing a Spark Streaming application where the input data is put into an 
S3 bucket in small batches (using Database Migration Service - DMS). The Spark 
application is the only consumer. I'm considering two possible architectures:

Have Spark Streaming watch an S3 prefix and pick up new objects as they come in
Stream data from S3 to a Kinesis stream (through a Lambda function triggered as 
new S3 objects are created by DMS) and use the stream as input for the Spark 
application.
While the second solution will work, the first solution is simpler. But are 
there any pitfalls? Looking at this guide, I'm concerned about two specific 
points:

> The more files under a directory, the longer it will take to scan for changes 
> — even if no files have been modified.

We will be keeping the S3 data indefinitely. So the number of objects under the 
prefix being monitored is going to increase very quickly.


Theres a slightly-more-optimised streaming source for cloud streams here

https://github.com/hortonworks-spark/cloud-integration/blob/master/spark-cloud-integration/src/main/scala/org/apache/spark/streaming/hortonworks/CloudInputDStream.scala


Even so, the cost of scanning S3 is one LIST request per 5000 objects; I'll 
leave it to you to work out how many there will be in your application —and how 
much it will cost. And of course, the more LIST calls tehre are, the longer 
things take, the bigger your window needs to be.


> “Full” Filesystems such as HDFS tend to set the modification time on their 
> files as soon as the output stream is created. When a file is opened, even 
> before data has been completely written, it may be included in the DStream - 
> after which updates to the file within the same window will be ignored. That 
> is: changes may be missed, and data omitted from the stream.

I'm not sure if this applies to S3, since to my understanding objects are 
created atomically and cannot be updated afterwards as is the case with 
ordinary files (unless deleted and recreated, which I don't believe DMS does)


Objects written to S3 are't visible until the upload completes, in an atomic 
operation. You can write in place and not worry.

The timestamp on S3 artifacts comes from the PUT tim. On multipart uploads of 
many MB/many GB uploads, thats when the first post to initiate the MPU is 
kicked off. So if the upload starts in time window t1 and completed in window 
t2, the object won't be visible until t2, but the timestamp will be of t1. Bear 
that  in mind.

The lambda callback probably does have better scalability and resilience;  not 
tried it myself.





Thanks for any help!



Re: [Spark Streaming] Measure latency

2018-06-26 Thread Gerard Maas
Hi Daniele,

A pragmatic approach to do that would be to execute the computations in the
scope of a foreachRDD, surrounded by wall-clock timers.
For example:
dstream.foreachRDD{ rdd =>
   val t0 = System.currentTimeMillis()
   val aggregates = rdd.
   // make sure you get a result here, not another RDD.
   // Otherwise you need to do something like rdd.count to materialize it
   val elapsedTime = System.currentTimeMillis() - t0
   println(s"operation took $elapsedTime")
}

In the end, this will result in the same performance as the batch spark
engine, so you might want to check the performance there first.
If you want to add the stream ingestion time to this, it gets a bit more
tricky.

kind regards, Gerard.



On Tue, Jun 26, 2018 at 11:49 AM Daniele Foroni 
wrote:

> Hi all,
>
> I am using spark streaming and I need to evaluate the latency of the
> standard aggregations (avg, min, max, …) provided by the spark APIs.
> Any way to do it in the code?
>
> Thanks in advance,
> ---
> Daniele
>
>


[Spark Streaming] Measure latency

2018-06-26 Thread Daniele Foroni
Hi all,

I am using spark streaming and I need to evaluate the latency of the standard 
aggregations (avg, min, max, …) provided by the spark APIs.
Any way to do it in the code?

Thanks in advance,
---
Daniele



[Spark Streaming] Spark Streaming with S3 vs Kinesis

2018-06-25 Thread Farshid Zavareh
I'm writing a Spark Streaming application where the input data is put into
an S3 bucket in small batches (using Database Migration Service - DMS). The
Spark application is the only consumer. I'm considering two possible
architectures:

Have Spark Streaming watch an S3 prefix and pick up new objects as they
come in
Stream data from S3 to a Kinesis stream (through a Lambda function
triggered as new S3 objects are created by DMS) and use the stream as input
for the Spark application.
While the second solution will work, the first solution is simpler. But are
there any pitfalls? Looking at this guide, I'm concerned about two specific
points:

> *The more files under a directory, the longer it will take to scan for
changes — even if no files have been modified.*

We will be keeping the S3 data indefinitely. So the number of objects under
the prefix being monitored is going to increase very quickly.

> *“Full” Filesystems such as HDFS tend to set the modification time on
their files as soon as the output stream is created. When a file is opened,
even before data has been completely written, it may be included in the
DStream - after which updates to the file within the same window will be
ignored. That is: changes may be missed, and data omitted from the stream.*

I'm not sure if this applies to S3, since to my understanding objects are
created atomically and cannot be updated afterwards as is the case with
ordinary files (unless deleted and recreated, which I don't believe DMS
does)

Thanks for any help!


[Spark Streaming] Are SparkListener/StreamingListener callbacks called concurrently?

2018-06-20 Thread Majid Azimi
Hi,

What is the concurrency model behind SparkListener or StreamingListener 
callbacks?
1. Multiple threads might access callbacks simultaneously.
2. Callbacks are guaranteed to be executed by a single thread.(Thread ids might 
change on consecutive calls, though)

I asked the same question on stackoverflow, and waited for about one day. Since 
there was no response, I'm reposting it here.

https://stackoverflow.com/questions/50921585/are-sparklistener-streaminglistener-callbacks-called-concurrently

[Spark Streaming]: How do I apply window before filter?

2018-06-11 Thread Tejas Manohar
Hey friends,

We're trying to make some batched computations run against an OLAP DB
closer to "realtime". One of our more complex computations is a trigger
when event A occurs but not event B within a given time period. Our
experience with Spark is limited, but since Spark 2.3.0 just introduced
Stream-Stream Joins

and
our data is already in Kafka, we thought we'd try it out.

That said, in our exploration, we've been running into an issue where Spark
optimizes the Kafka *watermark* to be applied *after* the *filter* in our
SQL query. This means the watermark won't move forward unless there's data
within the filtered results and thus, the trigger for "event B" not won't
occur until another "event B" is triggered, which can be problematic
depending on how granular the filter is.

See the quick isolated example I setup in *spark-shell* below.

```
scala> :paste
// Entering paste mode (ctrl-D to finish)

val kafka =
spark.readStream.format("kafka").option("kafka.bootstrap.servers",
*":"*).option("subscribe", "**").option("startingOffsets",
"latest").load()

import org.apache.spark.sql.types._
val schema = StructType(Seq(
  StructField("id", StringType),
  StructField("message", StructType(Seq(
StructField("event", StringType),
StructField("timestamp", TimestampType)
  )))
))

val parsed = kafka.select(from_json($"value".cast(StringType), schema) as
'data).select($"data.*", $"data.message.timestamp" as
'ts).withWatermark("ts", "10 seconds")

// Exiting paste mode, now interpreting.
scala> parsed.filter("message.event = 'Item
Added'").as('a).join(parsed.filter("message.event = 'Item Purchased'") as
'b, expr("a.id = b.id AND a.ts < b.ts AND b.ts < a.ts + interval 5
seconds"), joinType="left").explain()
== Physical Plan ==
StreamingSymmetricHashJoin [id#24], [id#37], LeftOuter, condition = [
leftOnly = null, rightOnly = null, both = ((ts#23-T1ms <
ts#39-T1ms) && (ts#39-T1ms < ts#23-T1ms + interval 5 seconds)),
full = ((ts#23-T1ms < ts#39-T1ms) && (ts#39-T1ms <
ts#23-T1ms + interval 5 seconds)) ], state info [ checkpoint =
, runId = 52d0e4a5-150c-4136-8542-c2c5e4bb59c2, opId = 0, ver = 0,
numPartitions = 4], 0, state cleanup [ left value predicate:
(ts#23-T1ms <= -500), right value predicate: (ts#39-T1ms <= 0) ]
:- Exchange hashpartitioning(id#24, 4)
:  +- EventTimeWatermark ts#23: timestamp, interval 10 seconds
: +- Project [jsontostructs(StructField(id,StringType,true),
StructField(message,StructType(StructField(event,StringType,true),
StructField(timestamp,TimestampType,true)),true), cast(value#8 as string),
Some(Etc/UTC), true).id AS id#24,
jsontostructs(StructField(id,StringType,true),
StructField(message,StructType(StructField(event,StringType,true),
StructField(timestamp,TimestampType,true)),true), cast(value#8 as string),
Some(Etc/UTC), true).message AS message#25,
jsontostructs(StructField(id,StringType,true),
StructField(message,StructType(StructField(event,StringType,true),
StructField(timestamp,TimestampType,true)),true), cast(value#8 as string),
Some(Etc/UTC), true).message.timestamp AS ts#23]
:+- Filter (jsontostructs(StructField(id,StringType,true),
StructField(message,StructType(StructField(event,StringType,true),
StructField(timestamp,TimestampType,true)),true), cast(value#8 as string),
Some(Etc/UTC), true).message.event = Item Added)
:   +- StreamingRelation kafka, [key#7, value#8, topic#9,
partition#10, offset#11L, timestamp#12, timestampType#13]
+- Exchange hashpartitioning(id#37, 4)
   +- *(1) Filter isnotnull(ts#39-T1ms)
  +- EventTimeWatermark ts#39: timestamp, interval 10 seconds
 +- Project [jsontostructs(StructField(id,StringType,true),
StructField(message,StructType(StructField(event,StringType,true),
StructField(timestamp,TimestampType,true)),true), cast(value#8 as string),
Some(Etc/UTC), true).id AS id#37,
jsontostructs(StructField(id,StringType,true),
StructField(message,StructType(StructField(event,StringType,true),
StructField(timestamp,TimestampType,true)),true), cast(value#8 as string),
Some(Etc/UTC), true).message AS message#38,
jsontostructs(StructField(id,StringType,true),
StructField(message,StructType(StructField(event,StringType,true),
StructField(timestamp,TimestampType,true)),true), cast(value#8 as string),
Some(Etc/UTC), true).message.timestamp AS ts#39]
+- Filter ((jsontostructs(StructField(id,StringType,true),
StructField(message,StructType(StructField(event,StringType,true),
StructField(timestamp,TimestampType,true)),true), cast(value#8 as string),
Some(Etc/UTC), true).message.event = Item Purchased) &&
isnotnull(jsontostructs(StructField(id,StringType,true),
StructField(message,StructType(StructField(event,StringType,true),
StructField(timestamp,TimestampType,true)),true), cast(value#8 as string),
Some(Etc/UTC), true).id))
   +- StreamingRelation kafka, [key#7, 

Re: [Spark Streaming] is spark.streaming.concurrentJobs a per node or a cluster global value ?

2018-06-11 Thread thomas lavocat

Thank you very much for your answer.

Since I don't have dependent jobs I will continue to use this functionality.


On 05/06/2018 13:52, Saisai Shao wrote:
"dependent" I mean this batch's job relies on the previous batch's 
result. So this batch should wait for the finish of previous batch, if 
you set "spark.streaming.concurrentJobs" larger than 1, then the 
current batch could start without waiting for the previous batch (if 
it is delayed), which will lead to unexpected results.



thomas lavocat <mailto:thomas.lavo...@univ-grenoble-alpes.fr>> 于2018年6月5日周二 
下午7:48写道:



On 05/06/2018 13:44, Saisai Shao wrote:

You need to read the code, this is an undocumented configuration.

I'm on it right now, but, Spark is a big piece of software.

Basically this will break the ordering of Streaming jobs, AFAIK
it may get unexpected results if you streaming jobs are not
independent.

What do you mean exactly by not independent ?
Are several source joined together dependent ?

Thanks,
Thomas


thomas lavocat mailto:thomas.lavo...@univ-grenoble-alpes.fr>> 于2018年6月5日周二
下午7:17写道:

Hello,

Thank's for your answer.


On 05/06/2018 11:24, Saisai Shao wrote:

spark.streaming.concurrentJobs is a driver side internal
configuration, this means that how many streaming jobs can
be submitted concurrently in one batch. Usually this should
not be configured by user, unless you're familiar with Spark
Streaming internals, and know the implication of this
configuration.


How can I find some documentation about those implications ?

I've experimented some configuration of this parameters and
found out that my overall throughput is increased in
correlation with this property.
But I'm experiencing scalability issues. With more than 16
receivers spread over 8 executors, my executors no longer
receive work from the driver and fall idle.
Is there an explanation ?

Thanks,
Thomas







[Spark Streaming] Distinct Count on unrelated columns

2018-06-06 Thread Aakash Basu
Hi guys,

Posted a question (link)

on StackOverflow, any help?


Thanks,
Aakash.


Re: [Spark Streaming] is spark.streaming.concurrentJobs a per node or a cluster global value ?

2018-06-05 Thread Saisai Shao
"dependent" I mean this batch's job relies on the previous batch's result.
So this batch should wait for the finish of previous batch, if you set "
spark.streaming.concurrentJobs" larger than 1, then the current batch could
start without waiting for the previous batch (if it is delayed), which will
lead to unexpected results.


thomas lavocat  于2018年6月5日周二
下午7:48写道:

>
> On 05/06/2018 13:44, Saisai Shao wrote:
>
> You need to read the code, this is an undocumented configuration.
>
> I'm on it right now, but, Spark is a big piece of software.
>
> Basically this will break the ordering of Streaming jobs, AFAIK it may get
> unexpected results if you streaming jobs are not independent.
>
> What do you mean exactly by not independent ?
> Are several source joined together dependent ?
>
> Thanks,
> Thomas
>
>
> thomas lavocat  于2018年6月5日周二
> 下午7:17写道:
>
>> Hello,
>>
>> Thank's for your answer.
>>
>> On 05/06/2018 11:24, Saisai Shao wrote:
>>
>> spark.streaming.concurrentJobs is a driver side internal configuration,
>> this means that how many streaming jobs can be submitted concurrently in
>> one batch. Usually this should not be configured by user, unless you're
>> familiar with Spark Streaming internals, and know the implication of this
>> configuration.
>>
>>
>> How can I find some documentation about those implications ?
>>
>> I've experimented some configuration of this parameters and found out
>> that my overall throughput is increased in correlation with this property.
>> But I'm experiencing scalability issues. With more than 16 receivers
>> spread over 8 executors, my executors no longer receive work from the
>> driver and fall idle.
>> Is there an explanation ?
>>
>> Thanks,
>> Thomas
>>
>>
>


Re: [Spark Streaming] is spark.streaming.concurrentJobs a per node or a cluster global value ?

2018-06-05 Thread thomas lavocat


On 05/06/2018 13:44, Saisai Shao wrote:

You need to read the code, this is an undocumented configuration.

I'm on it right now, but, Spark is a big piece of software.
Basically this will break the ordering of Streaming jobs, AFAIK it may 
get unexpected results if you streaming jobs are not independent.

What do you mean exactly by not independent ?
Are several source joined together dependent ?

Thanks,
Thomas


thomas lavocat <mailto:thomas.lavo...@univ-grenoble-alpes.fr>> 于2018年6月5日周二 
下午7:17写道:


Hello,

Thank's for your answer.


On 05/06/2018 11:24, Saisai Shao wrote:

spark.streaming.concurrentJobs is a driver side internal
configuration, this means that how many streaming jobs can be
submitted concurrently in one batch. Usually this should not be
configured by user, unless you're familiar with Spark Streaming
internals, and know the implication of this configuration.


How can I find some documentation about those implications ?

I've experimented some configuration of this parameters and found
out that my overall throughput is increased in correlation with
this property.
But I'm experiencing scalability issues. With more than 16
receivers spread over 8 executors, my executors no longer receive
work from the driver and fall idle.
Is there an explanation ?

Thanks,
Thomas





Re: [Spark Streaming] is spark.streaming.concurrentJobs a per node or a cluster global value ?

2018-06-05 Thread Saisai Shao
You need to read the code, this is an undocumented configuration.

Basically this will break the ordering of Streaming jobs, AFAIK it may get
unexpected results if you streaming jobs are not independent.

thomas lavocat  于2018年6月5日周二
下午7:17写道:

> Hello,
>
> Thank's for your answer.
>
> On 05/06/2018 11:24, Saisai Shao wrote:
>
> spark.streaming.concurrentJobs is a driver side internal configuration,
> this means that how many streaming jobs can be submitted concurrently in
> one batch. Usually this should not be configured by user, unless you're
> familiar with Spark Streaming internals, and know the implication of this
> configuration.
>
>
> How can I find some documentation about those implications ?
>
> I've experimented some configuration of this parameters and found out that
> my overall throughput is increased in correlation with this property.
> But I'm experiencing scalability issues. With more than 16 receivers
> spread over 8 executors, my executors no longer receive work from the
> driver and fall idle.
> Is there an explanation ?
>
> Thanks,
> Thomas
>
>


Re: [Spark Streaming] is spark.streaming.concurrentJobs a per node or a cluster global value ?

2018-06-05 Thread thomas lavocat

Hello,

Thank's for your answer.


On 05/06/2018 11:24, Saisai Shao wrote:
spark.streaming.concurrentJobs is a driver side internal 
configuration, this means that how many streaming jobs can be 
submitted concurrently in one batch. Usually this should not be 
configured by user, unless you're familiar with Spark Streaming 
internals, and know the implication of this configuration.


How can I find some documentation about those implications ?

I've experimented some configuration of this parameters and found out 
that my overall throughput is increased in correlation with this property.
But I'm experiencing scalability issues. With more than 16 receivers 
spread over 8 executors, my executors no longer receive work from the 
driver and fall idle.

Is there an explanation ?

Thanks,
Thomas



Re: [Spark Streaming] is spark.streaming.concurrentJobs a per node or a cluster global value ?

2018-06-05 Thread Saisai Shao
spark.streaming.concurrentJobs is a driver side internal configuration,
this means that how many streaming jobs can be submitted concurrently in
one batch. Usually this should not be configured by user, unless you're
familiar with Spark Streaming internals, and know the implication of this
configuration.



thomas lavocat  于2018年6月5日周二
下午4:20写道:

> Hi everyone,
>
> I'm wondering if the property  spark.streaming.concurrentJobs should
> reflects the total number of possible concurrent task on the cluster, or
> the a local number of concurrent tasks on one compute node.
>
> Thanks for your help.
>
> Thomas
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


[Spark Streaming] is spark.streaming.concurrentJobs a per node or a cluster global value ?

2018-06-05 Thread thomas lavocat

Hi everyone,

I'm wondering if the property  spark.streaming.concurrentJobs should 
reflects the total number of possible concurrent task on the cluster, or 
the a local number of concurrent tasks on one compute node.


Thanks for your help.

Thomas


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



How to work around NoOffsetForPartitionException when using Spark Streaming

2018-06-01 Thread Martin Peng
Hi,

We see below exception when using Spark Kafka streaming 0.10 on a normal
Kafka topic. Not sure why offset missing in zk, but since Spark streaming
override the offset reset policy to none in the code. I can not set the
reset policy to latest(I don't really care data loss now).

Is there any quick way to fix the missing offset or work around this?

Thanks,
Martin

1/06/2018 17:11:02: ERROR:the type of error is
org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined
offset with no reset policy for partition:
elasticsearchtopicrealtimereports-97
01/06/2018 17:11:02: ERROR:Undefined offset with no reset policy for
partition: elasticsearchtopicrealtimereports-97
org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:370)
org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:248)
org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1601)
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1034)
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:165)
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:184)
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:211)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
scala.Option.orElse(Option.scala:289)
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
scala.collection.immutable.List.foreach(List.scala:381)
scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
scala.collection.immutable.List.map(List.scala:285)
org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
scala.Option.orElse(Option.scala:289)
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
org.apache.spark.streaming.dstream.MapPartitionedDStream.compute(MapPartitionedDStream.scala:37)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
scala.Option.orElse(Option.scala:289)
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330

Spark streaming with kafka input stuck in (Re-)joing group because of group rebalancing

2018-05-15 Thread JF Chen
When I terminate a spark streaming application and restart it, it always
stuck in this step:
>
> Revoking previously assigned partitions [] for group [mygroup]
> (Re-)joing group [mygroup]


If I use a new group id, even though it works fine, I may lose the data
from the last time I read the previous group id.

So how to solve it?


Regard,
Junfeng Chen


Making spark streaming application single threaded

2018-05-09 Thread ravidspark
Hi All,

Is there any property which makes my spark streaming application a single
threaded? 

I researched on this property, *spark.dynamicAllocation.maxExecutors=1*, but
as far as I understand this launches a maximum of one container but not a
single thread. In local mode, we can configure the number of threads using
local[*]. But, how can I do the same in cluster mode? 

I am trying to read data from Kafka and I see in my logs, every Kafka
message is being read 3 times. I wanted this to be read only once. How can I
achieve this?


Thanks in advance, 
Ravi



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Spark Streaming]: Does DStream workload run over Spark SQL engine?

2018-05-02 Thread Saisai Shao
No, the underlying of DStream is RDD, so it will not leverage any SparkSQL
related feature. I think you should use Structured Streaming instead, which
is based on SparkSQL.

Khaled Zaouk <khaledz...@gmail.com> 于2018年5月2日周三 下午4:51写道:

> Hi,
>
> I have a question regarding the execution engine of Spark Streaming
> (DStream API): Does Spark streaming jobs run over the Spark SQL engine?
>
> For example, if I change a configuration parameter related to Spark SQL
> (like spark.sql.streaming.minBatchesToRetain or
> spark.sql.objectHashAggregate.sortBased.fallbackThreshold), does this
> make any difference when I run Spark streaming job (using DStream API)?
>
> Thank you!
>
> Khaled
>


[Spark Streaming]: Does DStream workload run over Spark SQL engine?

2018-05-02 Thread Khaled Zaouk
Hi,

I have a question regarding the execution engine of Spark Streaming
(DStream API): Does Spark streaming jobs run over the Spark SQL engine?

For example, if I change a configuration parameter related to Spark SQL
(like spark.sql.streaming.minBatchesToRetain or
spark.sql.objectHashAggregate.sortBased.fallbackThreshold), does this
make any difference when I run Spark streaming job (using DStream API)?

Thank you!

Khaled


re: spark streaming / AnalysisException on collect()

2018-04-30 Thread Peter Liu
 Hello there,

I have a quick question regarding how to share data (a small data
collection) between a kafka producer and consumer using spark streaming
(spark 2.2):

(A)
the data published by a kafka producer is received in order on the kafka
consumer side (see (a) copied below).

(B)
however, collect() or cache() on a streaming dataframe does not seem to be
supported (see links in (b) below): i got this:
Exception in thread "DataProducer" org.apache.spark.sql.AnalysisException:
Queries with streaming sources must be executed with writeStream.start();;

(C)
My question would be:

--- How can I use the collection data (on a streaming dataframe) arrived on
the consumer side, e.g convert it to an array of objects?
--- Maybe there's another quick way to use kafka for sharing static data
(instead of streaming) between two spark application services (without any
common spark context and session etc.)?

I have copied some code snippet in (c).

It seems to be a very simple use case scenario to share a global collection
between a spark producer and consumer. But I spent entire day to try
various options and gone thru online resources such as
google-general/apache-spark/stackoverflow/cloudera/etc/etc.

Any help would be very much appreciated!

Thanks!

Peter

(a) streaming data (df) received on the consumer side (console sink):

root
 |-- ad_id: string (nullable = true)
 |-- campaign_id: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)

---
Batch: 0
---
+++---+
|ad_id   |campaign_id
|timestamp  |
+++---+
|b5629b58-376e-462c-9e65-726184390c84|bf412fa4-aeaa-4148-8467-1e1e2a6e0945|2018-04-27
14:35:45.475|
|64e93f73-15bb-478c-9f96-fd38f6a24da2|bf412fa4-aeaa-4148-8467-1e1e2a6e0945|2018-04-27
14:35:45.475|
|05fa1349-fcb3-432e-9b58-2bb0559859a2|060810fd-0430-444f-808c-8a177613226a|2018-04-27
14:35:45.478|
|ae0a176e-236a-4d3a-acb9-141157e81568|42b68023-6a3a-4618-a54a-e6f71df26710|2018-04-27
14:35:45.484|

(b) online discussions on unsupported operations on streaming dataframe:
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#unsupported-operations

http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#unsupported-operatio...


https://stackoverflow.com/questions/42062092/why-does-using-cache-on-streaming-datasets-fail-with-analysisexception-queries


(c) code snippet:

OK:

   val rawDf = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaCluster.kafkaNodesString)
  .option("startingOffsets", "earliest")
  .option("subscribe", Variables.CAMPAIGNS_TOPIC)
  .load()

OK:

val mySchema = StructType(Array(
  StructField("ad_id", StringType),
  StructField("campaign_id", StringType)))

val campaignsDf2 = campaignsDf.select(from_json($"value",
mySchema).as("data"), $"timestamp")
  .select("data.*", "timestamp")

OK:

   campaignsDf2.writeStream
.format("console")
.option("truncate","false")
.trigger(org.apache.spark.sql.streaming.Trigger.Once()) //trigger once
since this is a onetime static data
.awaitTermination()


Exception:
  val campaignsArrayRows = campaignsDf2.collect()  //< not
supported  > AnalysisException!


Spark Streaming for more file types

2018-04-27 Thread रविशंकर नायर
All,

I have the following methods in my scala code, currently executed on demand

val files = sc.binaryFiles ("file:///imocks/data/ocr/raw")
//Abive line takes all PDF files
files.map(myconveter(_)).count


myconverter  signature:

def myconverter (
file: (String,
org.apache.spark.input.PortableDataStream)
) : Unit  =
{
//Code to interact with IBM Datamap OCR which converts the PDF files into
text

}

I do want to change the above code to Spark streaming. Unfortunately there
is  ( definitely the would be a great addition to Spark) No "binaryFiles"
functions from StreamingContext. The closest I can think of is to write
like this:

//Assuming myconverter is not changed

val dstream = ssc.fileStream[BytesWritable,BytesWritable,
SequenceFileAsBinaryInputFormat]("file:///imocks/data/ocr/raw") ;


dstream.map(myconverter(_))

Unfortunately everything is in problem now. There are errors showing the
method signature does not match etc etc. Can anyone please help how can I
get out of the issue? Appreciate your help.

Also, won't it be a super excellent idea to have all methods of
SparkContext to be reusable for StreamingContext as well ? In that way, it
takes no extra effort to change a batch program to a streaming app.

Best,
Passion


Re: schema change for structured spark streaming using jsonl files

2018-04-25 Thread Michael Segel
Hi,

This is going to sound complicated.

Taken as an individual JSON document, because its a self contained schema doc, 
its structured.  However there isn’t a persisting schema that has to be 
consistent across multiple documents.  So you can consider it semi structured.

If you’re parsing the JSON document and storing different attributes in 
separate columns… you will have a major issue because its possible for a JSON 
document to contain a new element that isn’t in your Parquet schema.

If you are going from JSON to parquet… you will probably be better off storing 
a serialized version of the JSON doc and then storing highlighted attributes in 
separate columns.

HTH

-Mike


> On Apr 23, 2018, at 1:46 PM, Lian Jiang <jiangok2...@gmail.com> wrote:
> 
> Hi,
> 
> I am using structured spark streaming which reads jsonl files and writes into 
> parquet files. I am wondering what's the process if jsonl files schema change.
> 
> Suppose jsonl files are generated in \jsonl folder and the old schema is { 
> "field1": String}. My proposal is:
> 
> 1. write the jsonl files with new schema (e.g. {"field1":String, 
> "field2":Int}) into another folder \jsonl2
> 2. let spark job complete handling all data in \jsonl, then stop the spark 
> streaming job.
> 3. use a spark script to convert the parquet files from old schema to new 
> schema (e.g. add a new column with some default value for "field2").
> 4. upgrade and start the spark streaming job for handling the new schema 
> jsonl files and parquet files.
> 
> Is this process correct (best)? Thanks for any clue.



Re: schema change for structured spark streaming using jsonl files

2018-04-24 Thread Lian Jiang
Thanks for any help!

On Mon, Apr 23, 2018 at 11:46 AM, Lian Jiang <jiangok2...@gmail.com> wrote:

> Hi,
>
> I am using structured spark streaming which reads jsonl files and writes
> into parquet files. I am wondering what's the process if jsonl files schema
> change.
>
> Suppose jsonl files are generated in \jsonl folder and the old schema is {
> "field1": String}. My proposal is:
>
> 1. write the jsonl files with new schema (e.g. {"field1":String,
> "field2":Int}) into another folder \jsonl2
> 2. let spark job complete handling all data in \jsonl, then stop the spark
> streaming job.
> 3. use a spark script to convert the parquet files from old schema to new
> schema (e.g. add a new column with some default value for "field2").
> 4. upgrade and start the spark streaming job for handling the new schema
> jsonl files and parquet files.
>
> Is this process correct (best)? Thanks for any clue.
>


schema change for structured spark streaming using jsonl files

2018-04-23 Thread Lian Jiang
Hi,

I am using structured spark streaming which reads jsonl files and writes
into parquet files. I am wondering what's the process if jsonl files schema
change.

Suppose jsonl files are generated in \jsonl folder and the old schema is {
"field1": String}. My proposal is:

1. write the jsonl files with new schema (e.g. {"field1":String,
"field2":Int}) into another folder \jsonl2
2. let spark job complete handling all data in \jsonl, then stop the spark
streaming job.
3. use a spark script to convert the parquet files from old schema to new
schema (e.g. add a new column with some default value for "field2").
4. upgrade and start the spark streaming job for handling the new schema
jsonl files and parquet files.

Is this process correct (best)? Thanks for any clue.


Re: How to bulk insert using spark streaming job

2018-04-19 Thread scorpio
You need to insert per partition per batch. Normally database drivers meant
for spark have bulk update feature built in. They take a RDD and do a bulk
insert per partition.
In case db driver you are using doesn't provide this feature, you can
aggregate records per partition and then send out to db by writing your own
code.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How to bulk insert using spark streaming job

2018-04-19 Thread ayan guha
by writing code, I suppose :) Jokes apart, I think you need to articulate
the problem with more details for others to help.

Do you mean you want to batch up data in memory and then write as a chunk?
Where do want to insert? Etc etc...

On Fri, Apr 20, 2018 at 1:08 PM, amit kumar singh <amitiem...@gmail.com>
wrote:

> How to bulk insert using spark streaming job
>
> Sent from my iPhone
>



-- 
Best Regards,
Ayan Guha


How to bulk insert using spark streaming job

2018-04-19 Thread amit kumar singh
How to bulk insert using spark streaming job 

Sent from my iPhone

Re: In spark streaming application how to distinguish between normal and abnormal termination of application?

2018-04-15 Thread Igor Makhlin
looks like nobody knows the answer on this question ;)

On Sat, Mar 31, 2018 at 1:59 PM, Igor Makhlin <igor.makh...@gmail.com>
wrote:

> Hi All,
>
> I'm looking for a way to distinguish between normal and abnormal
> termination of a spark streaming application with (checkpointing enabled).
>
> Adding application listener doesn't really help because onApplicationEnd
> event has no information regarding the cause of the termination.
>
> ssc_.sc.addSparkListener(new SparkListener {
>   override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): 
> Unit = {
>
> I need to manage an internal metadata if streaming application has been
> terminated and that termination is not recoverable I have to delete the
> metadata (state of stream in this particular application).
>
>
>
> --
> Sincerely,
>
> Igor Makhlin
>



-- 
Sincerely,

Igor Makhlin


Re: Testing spark streaming action

2018-04-10 Thread Jörn Franke
Run it as part of integration testing, you can still use scala test but with a 
different sub folder (it or integrationtest) instead of test.
Within integrationtest you create a local Spark server that has also 
accumulators.

> On 10. Apr 2018, at 17:35, Guillermo Ortiz  wrote:
> 
> I have a unitTest in SparkStreaming which has an input parameters.
> -DStream[String]
> 
> Inside of the code I want to update an LongAccumulator. When I execute the 
> test I get an NullPointerException because the accumulator doesn't exist. Is 
> there any way to test this?
> 
> My accumulator is updated in different methods. 
> 
> def execute(stream: DStream[String]): Unit = {
> stream.foreachRDD { rdd =>
>   rdd.foreach { r =>
> if (r == "A"){
>   acc.add(1)
>   sendKafka(...)
> }`enter code here`
> }  
> }
> 
> It's possible to test this kind of method?
> 
> runAction[String](input, service.execute)
> 
> When it try to update the accumulator it doesn't work because it doesn't have 
> inited. I could add a new parameter to the execute method, and it's okay, but 
> runAction doesn't admint more parameters either. 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Testing spark streaming action

2018-04-10 Thread Guillermo Ortiz
I have a unitTest in SparkStreaming which has an input parameters.
-DStream[String]

Inside of the code I want to update an LongAccumulator. When I execute the
test I get an NullPointerException because the accumulator doesn't exist.
Is there any way to test this?

My accumulator is updated in different methods.

def execute(stream: DStream[String]): Unit = {
stream.foreachRDD { rdd =>
  rdd.foreach { r =>
if (r == "A"){
  acc.add(1)
  sendKafka(...)
}`enter code here`
}
}

It's possible to test this kind of method?

runAction[String](input, service.execute)

When it try to update the accumulator it doesn't work because it doesn't
have inited. I could add a new parameter to the execute method, and it's
okay, but runAction doesn't admint more parameters either.


In spark streaming application how to distinguish between normal and abnormal termination of application?

2018-03-31 Thread Igor Makhlin
Hi All,

I'm looking for a way to distinguish between normal and abnormal
termination of a spark streaming application with (checkpointing enabled).

Adding application listener doesn't really help because onApplicationEnd
event has no information regarding the cause of the termination.

ssc_.sc.addSparkListener(new SparkListener {
  override def onApplicationEnd(applicationEnd:
SparkListenerApplicationEnd): Unit = {

I need to manage an internal metadata if streaming application has been
terminated and that termination is not recoverable I have to delete the
metadata (state of stream in this particular application).



-- 
Sincerely,

Igor Makhlin


Transaction Examplefor spark streaming in Spark2.2

2018-03-22 Thread KhajaAsmath Mohammed
Hi Cody,

I am following to implement the exactly once semantics and also utilize
storing the offsets in database. Question I have is how to use hive instead
of traditional datastores. write to hive will be successful even though
there is any issue with saving offsets into DB. Could you please correct me
if I am wrong or let me know if you have any other suggestions.

stream.foreachRDD { rdd =>
   if (!rdd.isEmpty()) {
   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  DB.localTx { implicit session =>
   *// Write data to Hive after creating dataframes from Dtream RDD*

// Store Offsets to DB
offsetRanges.foreach { osr =>
  val offsetRows = sql"""
  update txn_offsets set offset =
${osr.untilOffset}
where topic = ${osr.topic} and part =
${osr.partition} and offset = ${osr.fromOffset}
  """.update.apply()
  if (offsetRows != 1) {
throw new Exception(s"""
  Got $offsetRows rows affected instead of 1
when attempting to update offsets for
   ${osr.topic} ${osr.partition}
${osr.fromOffset} -> ${osr.untilOffset}
  Was a partition repeated after a worker
failure?
  """)
  }
}

  }
   }
}

Thanks,
Asmath


Wait for 30 seconds before terminating Spark Streaming

2018-03-21 Thread Aakash Basu
Hi,

Using: *Spark 2.3 + Kafka 0.10*


How to wait for 30 seconds after the latest stream and if there's no more
streaming data, gracefully exit.

Is it running -

query.awaitTermination(30)


Or is it something else?

I tried with this, keeping -

option("startingOffsets", "latest")

for both my input streams be joined.

Am first running the Spark job, and then pushing both the csv file data
into the two respective Kafka topics, but getting the following error -


ERROR MicroBatchExecution:91 - Query [id =
21c96e5c-770d-4d59-8893-4401217120b6, runId =
e03d7ac9-97b6-442e-adf4-dd232f9ed616] terminated with error
org.apache.spark.SparkException: Writing job aborted.


When I keep -

option("startingOffsets", "earliest")

The first batch output works perfectly and then terminates after the given
time.

Please help!

Thanks,
Aakash.


Re: Multiple Kafka Spark Streaming Dataframe Join query

2018-03-21 Thread Aakash Basu
Thanks Chris!

On Fri, Mar 16, 2018 at 10:13 PM, Bowden, Chris <chris.bow...@microfocus.com
> wrote:

> 2. You must decide. If multiple streaming queries are launched in a single
> / simple application, only you can dictate if a single failure should cause
> the application to exit. If you use spark.streams.awaitAnyTermination be
> aware it returns / throws if _any_ streaming query terminates. A more
> complex application may keep track of many streaming queries and attempt to
> relaunch them with lower latency for certain types of failures.
>
>
> 3a. I'm not very familiar with py, but I doubt you need the sleep
>
> 3b. Kafka consumer tuning is simply a matter of passing appropriate config
> keys to the source's options if desired
>
> 3c. I would argue the most obvious improvement would be a more structured
> and compact data format if CSV isn't required.
>
> --
> *From:* Aakash Basu <aakash.spark@gmail.com>
> *Sent:* Friday, March 16, 2018 9:12:39 AM
> *To:* sagar grover
> *Cc:* Bowden, Chris; Tathagata Das; Dylan Guedes; Georg Heiler; user;
> jagrati.go...@myntra.com
>
> *Subject:* Re: Multiple Kafka Spark Streaming Dataframe Join query
>
> Hi all,
>
> From the last mail queries in the bottom, query 1's doubt has been
> resolved, I was already guessing so, that I resent same columns from Kafka
> producer multiple times, hence the join gave duplicates.
>
> Retested with fresh Kafka feed and problem was solved.
>
> But, the other queries still persists, would anyone like to reply? :)
>
> Thanks,
> Aakash.
>
> On 16-Mar-2018 3:57 PM, "Aakash Basu" <aakash.spark@gmail.com> wrote:
>
> Hi all,
>
> The code was perfectly alright, just the package I was submitting had to
> be the updated one (marked green below). The join happened but the output
> has many duplicates (even though the *how *parameter is by default *inner*)
> -
>
> Spark Submit:
>
> /home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit --packages 
> org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 
> /home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Stream_Join.py
>
>
>
> Code:
>
> from pyspark.sql import SparkSession
> import time
> from pyspark.sql.functions import split, col
>
> class test:
>
>
> spark = SparkSession.builder \
> .appName("DirectKafka_Spark_Stream_Stream_Join") \
> .getOrCreate()
>
> table1_stream = 
> (spark.readStream.format("kafka").option("startingOffsets", 
> "earliest").option("kafka.bootstrap.servers", 
> "localhost:9092").option("subscribe", "test1").load())
>
> table2_stream = 
> (spark.readStream.format("kafka").option("startingOffsets", 
> "earliest").option("kafka.bootstrap.servers", 
> "localhost:9092").option("subscribe", "test2").load())
>
>
> query1 = table1_stream.select('value')\
> .withColumn('value', table1_stream.value.cast("string")) \
> .withColumn("ID", split(col("value"), ",").getItem(0)) \
> .withColumn("First_Name", split(col("value"), ",").getItem(1)) \
> .withColumn("Last_Name", split(col("value"), ",").getItem(2)) \
> .drop('value')
>
> query2 = table2_stream.select('value') \
> .withColumn('value', table2_stream.value.cast("string")) \
> .withColumn("ID", split(col("value"), ",").getItem(0)) \
> .withColumn("Department", split(col("value"), ",").getItem(1)) \
> .withColumn("Date_joined", split(col("value"), ",").getItem(2)) \
> .drop('value')
>
> joined_Stream = query1.join(query2, "Id")
>
> a = query1.writeStream.format("console").start()
> b = query2.writeStream.format("console").start()
> c = joined_Stream.writeStream.format("console").start()
>
> time.sleep(10)
>
> a.awaitTermination()
> b.awaitTermination()
> c.awaitTermination()
>
>
> Output -
>
> +---+--+-+---+---+
> | ID|First_Name|Last_Name| Department|Date_joined|
> +---+--+-+---+---+
> |  3| Tobit|Robardley| Accounting|   8/3/2006|
> |  3| Tobit|Robardley| Accounting|   8/3/2006|
> |  3| Tobit|Robardley| Accounting|   8/3/2006|
> |  3| Tobit|Robardley| Accounting|   8/3/2

Re: Multiple Kafka Spark Streaming Dataframe Join query

2018-03-16 Thread Aakash Basu
wrote:
>>>
>>>> Awesome, thanks for detailing!
>>>>
>>>> Was thinking the same, we've to split by comma for csv while casting
>>>> inside.
>>>>
>>>> Cool! Shall try it and revert back tomm.
>>>>
>>>> Thanks a ton!
>>>>
>>>> On 15-Mar-2018 11:50 PM, "Bowden, Chris" <chris.bow...@microfocus.com>
>>>> wrote:
>>>>
>>>>> To remain generic, the KafkaSource can only offer the lowest common
>>>>> denominator for a schema (topic, partition, offset, key, value, timestamp,
>>>>> timestampType). As such, you can't just feed it a StructType. When you are
>>>>> using a producer or consumer directly with Kafka, serialization and
>>>>> deserialization is often an orthogonal and implicit transform. However, in
>>>>> Spark, serialization and deserialization is an explicit transform (e.g.,
>>>>> you define it in your query plan).
>>>>>
>>>>>
>>>>> To make this more granular, if we imagine your source is registered as
>>>>> a temp view named "foo":
>>>>>
>>>>> SELECT
>>>>>
>>>>>   split(cast(value as string), ',')[0] as id,
>>>>>
>>>>>   split(cast(value as string), ',')[1] as name
>>>>>
>>>>> FROM foo;
>>>>>
>>>>>
>>>>> Assuming you were providing the following messages to Kafka:
>>>>>
>>>>> 1,aakash
>>>>>
>>>>> 2,tathagata
>>>>>
>>>>> 3,chris
>>>>>
>>>>>
>>>>> You could make the query plan less repetitive. I don't believe Spark
>>>>> offers from_csv out of the box as an expression (although CSV is well
>>>>> supported as a data source). You could implement an expression by reusing 
>>>>> a
>>>>> lot of the supporting CSV classes which may result in a better user
>>>>> experience vs. explicitly using split and array indices, etc. In this
>>>>> simple example, casting the binary to a string just works because there is
>>>>> a common understanding of string's encoded as bytes between Spark and 
>>>>> Kafka
>>>>> by default.
>>>>>
>>>>>
>>>>> -Chris
>>>>> --
>>>>> *From:* Aakash Basu <aakash.spark@gmail.com>
>>>>> *Sent:* Thursday, March 15, 2018 10:48:45 AM
>>>>> *To:* Bowden, Chris
>>>>> *Cc:* Tathagata Das; Dylan Guedes; Georg Heiler; user
>>>>>
>>>>> *Subject:* Re: Multiple Kafka Spark Streaming Dataframe Join query
>>>>>
>>>>> Hey Chris,
>>>>>
>>>>> You got it right. I'm reading a *csv *file from local as mentioned
>>>>> above, with a console producer on Kafka side.
>>>>>
>>>>> So, as it is a csv data with headers, shall I then use from_csv on the
>>>>> spark side and provide a StructType to shape it up with a schema and then
>>>>> cast it to string as TD suggested?
>>>>>
>>>>> I'm getting all of your points at a very high level. A little more
>>>>> granularity would help.
>>>>>
>>>>> *In the slide TD just shared*, PFA, I'm confused at the point where
>>>>> he is casting the value as string. Logically, the value shall consist of
>>>>> all the entire data set, so, suppose, I've a table with many columns, *how
>>>>> can I provide a single alias as he did in the groupBy. I missed it there
>>>>> itself. Another question is, do I have to cast in groupBy itself? Can't I
>>>>> do it directly in a select query? The last one, if the steps are followed,
>>>>> can I then run a SQL query on top of the columns separately?*
>>>>>
>>>>> Thanks,
>>>>> Aakash.
>>>>>
>>>>>
>>>>> On 15-Mar-2018 9:07 PM, "Bowden, Chris" <chris.bow...@microfocus.com>
>>>>> wrote:
>>>>>
>>>>> You need to tell Spark about the structure of the data, it doesn't
>>>>> know ahead of time if you put avro, json, protobuf, etc. in kafka for the
>>>>> message format. If the messages are in json, Spark provides from_json out
>>>>> of the

Re: Multiple Kafka Spark Streaming Dataframe Join query

2018-03-16 Thread Aakash Basu
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *Final code (for clearer understanding of where it may go wrong in 2.3.0)
> -from pyspark.sql import SparkSessionimport timefrom pyspark.sql.functions
> import split, colclass test: spark = SparkSession.builder \
> .appName("DirectKafka_Spark_Stream_Stream_Join") \ .getOrCreate()
> table1_stream = (spark.readStream.format("kafka").option("startingOffsets",
> "earliest").option("kafka.bootstrap.servers",
> "localhost:9092").option("subscribe", "test1").load()) query =
> table1_stream.select('value').withColumn('value',
> table1_stream.value.cast("string")) \ .withColumn("ID", split(col("value"),
> ",").getItem(0)) \ .withColumn("First_Name", split(col("value"),
> ",").getItem(1)) \ .withColumn("Last_Name", split(col("value"),
> ",").getItem(2)) \ .drop('value').writeStream.format("console").start()
> time.sleep(10) query.awaitTermination()# Code working in Spark 2.2.1#
> /home/kafka/Downloads/spark-2.2.1-bin-hadoop2.7/bin/spark-submit --packages
> org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0
> /home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Stream_Join.py#
> Code not working in Spark 2.3.0#
> /home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit --packages
> org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0
> /home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Stream_Join.py*
> 2) I'm getting the below output as expected, from the above code in 2.2.1.
> My query is, is there a way to get the header of a file being read and
> ensure header=True? (Or is it that for Structured Streaming, user has to
> provide headers explicitly all the time, as data shall always come in this
> structure [for Kafka] - topic, partition, offset, key, value, timestamp,
> timestampType; if so, then how to remove column headers explicitly from the
> data, as in the below table) I know it is a stream, and the data is fed in
> as messages, but still wanted experts to put some more light into it.
>
> +---+--+-+
> | ID|First_Name|Last_Name|
> +---+--+-+
> | Hi|  null| null|
> | id|first_name|last_name|
> |  1|  Kellyann|Moyne|
> |  2| Morty|  Blacker|
> |  3| Tobit|Robardley|
> |  4|Wilona|Kells|
> |  5| Reggy|Comizzoli|
> | id|first_name|last_name|
> |  1|  Kellyann|Moyne|
> |  2| Morty|  Blacker|
> |  3| Tobit|Robardley|
> |  4|Wilona|Kells|
> |  5| Reggy|Comizzoli|
> | id|first_name|last_name|
> |  1|  Kellyann|Moyne|
> |  2| Morty|  Blacker|
> |  3| Tobit|Robardley|
> |  4|Wilona|Kells|
> |  5| Reggy|Comizzoli|
> | id|first_name|last_name|
> +---+--+-+
> only showing top 20 rows
>
>
> Any help?
>
> Thanks,
> Aakash.
>
> On Fri, Mar 16, 2018 at 12:54 PM, sagar grover <sagargrove...@gmail.com>
> wrote:
>
>>
>> With regards,
>> Sagar Grover
>> Phone - 7022175584
>>
>> On Fri, Mar 16, 2018 at 12:15 AM, Aakash Basu <aakash.spark@gmail.com
>> > wrote:
>>
>>> Awesome, thanks for detailing!
>>>
>>> Was thinking the same, we've to split by comma for csv while casting
>>> inside.
>>>
>>> Cool! Shall try it and revert back tomm.
>>>
>>> Thanks a ton!
>>>
>>> On 15-Mar-2018 11:50 PM, "Bowden, Chris" <chris.bow...@microfocus.com>
>>> wrote:
>>>
>>>> To remain generic, the KafkaSource can only offer the lowest common
>>>> denominator for a schema (topic, partition, offset, key, value, timestamp,
>>>> timestampType). As such, you can't just feed it a StructType. When you are
>>>> using a producer or consumer directly with Kafka, serialization and
>>>> deserialization is often an orthogonal and implicit transform. However, in
>>>> Spark, serialization and deserialization is an explicit transform (e.g.,
>>>> you define it in your query plan).
>>>>
>>>>
>>>> To make this more granular, if we imagine your source is registered as
>>>> a temp view named "foo":
>>>>
>>>> SELECT
>>>>
>>>>   split(cast(value as string), ',')[0] as id,
>>>>
>>>>   split(cast(value as string), ',')[1] as name
>>>>
>>>> FROM foo;
>>>>
>>>>
>>>>

Re: Multiple Kafka Spark Streaming Dataframe Join query

2018-03-16 Thread Aakash Basu
plit(cast(value as string), ',')[1] as name
>>>
>>> FROM foo;
>>>
>>>
>>> Assuming you were providing the following messages to Kafka:
>>>
>>> 1,aakash
>>>
>>> 2,tathagata
>>>
>>> 3,chris
>>>
>>>
>>> You could make the query plan less repetitive. I don't believe Spark
>>> offers from_csv out of the box as an expression (although CSV is well
>>> supported as a data source). You could implement an expression by reusing a
>>> lot of the supporting CSV classes which may result in a better user
>>> experience vs. explicitly using split and array indices, etc. In this
>>> simple example, casting the binary to a string just works because there is
>>> a common understanding of string's encoded as bytes between Spark and Kafka
>>> by default.
>>>
>>>
>>> -Chris
>>> --
>>> *From:* Aakash Basu <aakash.spark@gmail.com>
>>> *Sent:* Thursday, March 15, 2018 10:48:45 AM
>>> *To:* Bowden, Chris
>>> *Cc:* Tathagata Das; Dylan Guedes; Georg Heiler; user
>>>
>>> *Subject:* Re: Multiple Kafka Spark Streaming Dataframe Join query
>>>
>>> Hey Chris,
>>>
>>> You got it right. I'm reading a *csv *file from local as mentioned
>>> above, with a console producer on Kafka side.
>>>
>>> So, as it is a csv data with headers, shall I then use from_csv on the
>>> spark side and provide a StructType to shape it up with a schema and then
>>> cast it to string as TD suggested?
>>>
>>> I'm getting all of your points at a very high level. A little more
>>> granularity would help.
>>>
>>> *In the slide TD just shared*, PFA, I'm confused at the point where he
>>> is casting the value as string. Logically, the value shall consist of all
>>> the entire data set, so, suppose, I've a table with many columns, *how
>>> can I provide a single alias as he did in the groupBy. I missed it there
>>> itself. Another question is, do I have to cast in groupBy itself? Can't I
>>> do it directly in a select query? The last one, if the steps are followed,
>>> can I then run a SQL query on top of the columns separately?*
>>>
>>> Thanks,
>>> Aakash.
>>>
>>>
>>> On 15-Mar-2018 9:07 PM, "Bowden, Chris" <chris.bow...@microfocus.com>
>>> wrote:
>>>
>>> You need to tell Spark about the structure of the data, it doesn't know
>>> ahead of time if you put avro, json, protobuf, etc. in kafka for the
>>> message format. If the messages are in json, Spark provides from_json out
>>> of the box. For a very simple POC you can happily cast the value to a
>>> string, etc. if you are prototyping and pushing messages by hand with a
>>> console producer on the kafka side.
>>>
>>> 
>>> From: Aakash Basu <aakash.spark@gmail.com>
>>> Sent: Thursday, March 15, 2018 7:52:28 AM
>>> To: Tathagata Das
>>> Cc: Dylan Guedes; Georg Heiler; user
>>> Subject: Re: Multiple Kafka Spark Streaming Dataframe Join query
>>>
>>> Hi,
>>>
>>> And if I run this below piece of code -
>>>
>>>
>>> from pyspark.sql import SparkSession
>>> import time
>>>
>>> class test:
>>>
>>>
>>> spark = SparkSession.builder \
>>> .appName("DirectKafka_Spark_Stream_Stream_Join") \
>>> .getOrCreate()
>>> # ssc = StreamingContext(spark, 20)
>>>
>>> table1_stream = 
>>> (spark.readStream.format("kafka").option("startingOffsets",
>>> "earliest").option("kafka.bootstrap.servers",
>>> "localhost:9092").option("subscribe", "test1").load())
>>>
>>> table2_stream = (
>>> spark.readStream.format("kafka").option("startingOffsets",
>>> "earliest").option("kafka.bootstrap.servers",
>>>
>>>   "localhost:9092").option("subscribe",
>>>
>>>"test2").load())
>>>
>>> joined_Stream = table1_stream.join(table2_stream, "Id")
>>> #
>>> # joined_Stream.show()
>>>
>>> # query =
>>> table1_stream.writeStream.format("console&qu

Re: Multiple Kafka Spark Streaming Dataframe Join query

2018-03-16 Thread sagar grover
With regards,
Sagar Grover
Phone - 7022175584

On Fri, Mar 16, 2018 at 12:15 AM, Aakash Basu <aakash.spark@gmail.com>
wrote:

> Awesome, thanks for detailing!
>
> Was thinking the same, we've to split by comma for csv while casting
> inside.
>
> Cool! Shall try it and revert back tomm.
>
> Thanks a ton!
>
> On 15-Mar-2018 11:50 PM, "Bowden, Chris" <chris.bow...@microfocus.com>
> wrote:
>
>> To remain generic, the KafkaSource can only offer the lowest common
>> denominator for a schema (topic, partition, offset, key, value, timestamp,
>> timestampType). As such, you can't just feed it a StructType. When you are
>> using a producer or consumer directly with Kafka, serialization and
>> deserialization is often an orthogonal and implicit transform. However, in
>> Spark, serialization and deserialization is an explicit transform (e.g.,
>> you define it in your query plan).
>>
>>
>> To make this more granular, if we imagine your source is registered as a
>> temp view named "foo":
>>
>> SELECT
>>
>>   split(cast(value as string), ',')[0] as id,
>>
>>   split(cast(value as string), ',')[1] as name
>>
>> FROM foo;
>>
>>
>> Assuming you were providing the following messages to Kafka:
>>
>> 1,aakash
>>
>> 2,tathagata
>>
>> 3,chris
>>
>>
>> You could make the query plan less repetitive. I don't believe Spark
>> offers from_csv out of the box as an expression (although CSV is well
>> supported as a data source). You could implement an expression by reusing a
>> lot of the supporting CSV classes which may result in a better user
>> experience vs. explicitly using split and array indices, etc. In this
>> simple example, casting the binary to a string just works because there is
>> a common understanding of string's encoded as bytes between Spark and Kafka
>> by default.
>>
>>
>> -Chris
>> --
>> *From:* Aakash Basu <aakash.spark@gmail.com>
>> *Sent:* Thursday, March 15, 2018 10:48:45 AM
>> *To:* Bowden, Chris
>> *Cc:* Tathagata Das; Dylan Guedes; Georg Heiler; user
>>
>> *Subject:* Re: Multiple Kafka Spark Streaming Dataframe Join query
>>
>> Hey Chris,
>>
>> You got it right. I'm reading a *csv *file from local as mentioned
>> above, with a console producer on Kafka side.
>>
>> So, as it is a csv data with headers, shall I then use from_csv on the
>> spark side and provide a StructType to shape it up with a schema and then
>> cast it to string as TD suggested?
>>
>> I'm getting all of your points at a very high level. A little more
>> granularity would help.
>>
>> *In the slide TD just shared*, PFA, I'm confused at the point where he
>> is casting the value as string. Logically, the value shall consist of all
>> the entire data set, so, suppose, I've a table with many columns, *how
>> can I provide a single alias as he did in the groupBy. I missed it there
>> itself. Another question is, do I have to cast in groupBy itself? Can't I
>> do it directly in a select query? The last one, if the steps are followed,
>> can I then run a SQL query on top of the columns separately?*
>>
>> Thanks,
>> Aakash.
>>
>>
>> On 15-Mar-2018 9:07 PM, "Bowden, Chris" <chris.bow...@microfocus.com>
>> wrote:
>>
>> You need to tell Spark about the structure of the data, it doesn't know
>> ahead of time if you put avro, json, protobuf, etc. in kafka for the
>> message format. If the messages are in json, Spark provides from_json out
>> of the box. For a very simple POC you can happily cast the value to a
>> string, etc. if you are prototyping and pushing messages by hand with a
>> console producer on the kafka side.
>>
>> 
>> From: Aakash Basu <aakash.spark@gmail.com>
>> Sent: Thursday, March 15, 2018 7:52:28 AM
>> To: Tathagata Das
>> Cc: Dylan Guedes; Georg Heiler; user
>> Subject: Re: Multiple Kafka Spark Streaming Dataframe Join query
>>
>> Hi,
>>
>> And if I run this below piece of code -
>>
>>
>> from pyspark.sql import SparkSession
>> import time
>>
>> class test:
>>
>>
>> spark = SparkSession.builder \
>> .appName("DirectKafka_Spark_Stream_Stream_Join") \
>> .getOrCreate()
>> # ssc = StreamingContext(spark, 20)
>>
>> table1_stream = 
>> (spark.readStream.format("kafka").option("

Re: Multiple Kafka Spark Streaming Dataframe Join query

2018-03-15 Thread Aakash Basu
Awesome, thanks for detailing!

Was thinking the same, we've to split by comma for csv while casting inside.

Cool! Shall try it and revert back tomm.

Thanks a ton!

On 15-Mar-2018 11:50 PM, "Bowden, Chris" <chris.bow...@microfocus.com>
wrote:

> To remain generic, the KafkaSource can only offer the lowest common
> denominator for a schema (topic, partition, offset, key, value, timestamp,
> timestampType). As such, you can't just feed it a StructType. When you are
> using a producer or consumer directly with Kafka, serialization and
> deserialization is often an orthogonal and implicit transform. However, in
> Spark, serialization and deserialization is an explicit transform (e.g.,
> you define it in your query plan).
>
>
> To make this more granular, if we imagine your source is registered as a
> temp view named "foo":
>
> SELECT
>
>   split(cast(value as string), ',')[0] as id,
>
>   split(cast(value as string), ',')[1] as name
>
> FROM foo;
>
>
> Assuming you were providing the following messages to Kafka:
>
> 1,aakash
>
> 2,tathagata
>
> 3,chris
>
>
> You could make the query plan less repetitive. I don't believe Spark
> offers from_csv out of the box as an expression (although CSV is well
> supported as a data source). You could implement an expression by reusing a
> lot of the supporting CSV classes which may result in a better user
> experience vs. explicitly using split and array indices, etc. In this
> simple example, casting the binary to a string just works because there is
> a common understanding of string's encoded as bytes between Spark and Kafka
> by default.
>
>
> -Chris
> --
> *From:* Aakash Basu <aakash.spark@gmail.com>
> *Sent:* Thursday, March 15, 2018 10:48:45 AM
> *To:* Bowden, Chris
> *Cc:* Tathagata Das; Dylan Guedes; Georg Heiler; user
> *Subject:* Re: Multiple Kafka Spark Streaming Dataframe Join query
>
> Hey Chris,
>
> You got it right. I'm reading a *csv *file from local as mentioned above,
> with a console producer on Kafka side.
>
> So, as it is a csv data with headers, shall I then use from_csv on the
> spark side and provide a StructType to shape it up with a schema and then
> cast it to string as TD suggested?
>
> I'm getting all of your points at a very high level. A little more
> granularity would help.
>
> *In the slide TD just shared*, PFA, I'm confused at the point where he is
> casting the value as string. Logically, the value shall consist of all the
> entire data set, so, suppose, I've a table with many columns, *how can I
> provide a single alias as he did in the groupBy. I missed it there itself.
> Another question is, do I have to cast in groupBy itself? Can't I do it
> directly in a select query? The last one, if the steps are followed, can I
> then run a SQL query on top of the columns separately?*
>
> Thanks,
> Aakash.
>
>
> On 15-Mar-2018 9:07 PM, "Bowden, Chris" <chris.bow...@microfocus.com>
> wrote:
>
> You need to tell Spark about the structure of the data, it doesn't know
> ahead of time if you put avro, json, protobuf, etc. in kafka for the
> message format. If the messages are in json, Spark provides from_json out
> of the box. For a very simple POC you can happily cast the value to a
> string, etc. if you are prototyping and pushing messages by hand with a
> console producer on the kafka side.
>
> 
> From: Aakash Basu <aakash.spark@gmail.com>
> Sent: Thursday, March 15, 2018 7:52:28 AM
> To: Tathagata Das
> Cc: Dylan Guedes; Georg Heiler; user
> Subject: Re: Multiple Kafka Spark Streaming Dataframe Join query
>
> Hi,
>
> And if I run this below piece of code -
>
>
> from pyspark.sql import SparkSession
> import time
>
> class test:
>
>
> spark = SparkSession.builder \
> .appName("DirectKafka_Spark_Stream_Stream_Join") \
> .getOrCreate()
> # ssc = StreamingContext(spark, 20)
>
> table1_stream = 
> (spark.readStream.format("kafka").option("startingOffsets",
> "earliest").option("kafka.bootstrap.servers",
> "localhost:9092").option("subscribe", "test1").load())
>
> table2_stream = (
> spark.readStream.format("kafka").option("startingOffsets",
> "earliest").option("kafka.bootstrap.servers",
>
> "localhost:9092").option("subscribe",
>
>  "test2").load())
>
> joined_Stream = table1_stream.join(table2_stream, "Id")
> #
> # jo

Re: Multiple Kafka Spark Streaming Dataframe Join query

2018-03-15 Thread Aakash Basu
Hey Chris,

You got it right. I'm reading a *csv *file from local as mentioned above,
with a console producer on Kafka side.

So, as it is a csv data with headers, shall I then use from_csv on the
spark side and provide a StructType to shape it up with a schema and then
cast it to string as TD suggested?

I'm getting all of your points at a very high level. A little more
granularity would help.

*In the slide TD just shared*, PFA, I'm confused at the point where he is
casting the value as string. Logically, the value shall consist of all the
entire data set, so, suppose, I've a table with many columns, *how can I
provide a single alias as he did in the groupBy. I missed it there itself.
Another question is, do I have to cast in groupBy itself? Can't I do it
directly in a select query? The last one, if the steps are followed, can I
then run a SQL query on top of the columns separately?*

Thanks,
Aakash.


On 15-Mar-2018 9:07 PM, "Bowden, Chris" <chris.bow...@microfocus.com> wrote:

You need to tell Spark about the structure of the data, it doesn't know
ahead of time if you put avro, json, protobuf, etc. in kafka for the
message format. If the messages are in json, Spark provides from_json out
of the box. For a very simple POC you can happily cast the value to a
string, etc. if you are prototyping and pushing messages by hand with a
console producer on the kafka side.


From: Aakash Basu <aakash.spark@gmail.com>
Sent: Thursday, March 15, 2018 7:52:28 AM
To: Tathagata Das
Cc: Dylan Guedes; Georg Heiler; user
Subject: Re: Multiple Kafka Spark Streaming Dataframe Join query

Hi,

And if I run this below piece of code -


from pyspark.sql import SparkSession
import time

class test:


spark = SparkSession.builder \
.appName("DirectKafka_Spark_Stream_Stream_Join") \
.getOrCreate()
# ssc = StreamingContext(spark, 20)

table1_stream = (spark.readStream.format("kafka").option("startingOffsets",
"earliest").option("kafka.bootstrap.servers",
"localhost:9092").option("subscribe",
"test1").load())

table2_stream = (
spark.readStream.format("kafka").option("startingOffsets",
"earliest").option("kafka.bootstrap.servers",

  "localhost:9092").option("subscribe",

   "test2").load())

joined_Stream = table1_stream.join(table2_stream, "Id")
#
# joined_Stream.show()

# query =
table1_stream.writeStream.format("console").start().awaitTermination()
# .queryName("table_A").format("memory")
# spark.sql("select * from table_A").show()
time.sleep(10)  # sleep 20 seconds
# query.stop()
# query


# /home/kafka/Downloads/spark-2.2.1-bin-hadoop2.7/bin/spark-submit
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0
Stream_Stream_Join.py




I get the below error (in Spark 2.3.0) -

Traceback (most recent call last):
  File 
"/home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Stream_Join.py",
line 4, in 
class test:
  File 
"/home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Stream_Join.py",
line 19, in test
joined_Stream = table1_stream.join(table2_stream, "Id")
  File "/home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/python/
lib/pyspark.zip/pyspark/sql/dataframe.py", line 931, in join
  File "/home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/python/
lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
  File "/home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/python/
lib/pyspark.zip/pyspark/sql/utils.py", line 69, in deco
pyspark.sql.utils.AnalysisException: u'USING column `Id` cannot be resolved
on the left side of the join. The left-side columns: [key, value, topic,
partition, offset, timestamp, timestampType];'

Seems, as per the documentation, they key and value are deserialized as
byte arrays.

I am badly stuck at this step, not many materials online, with steps to
proceed on this, too.

Any help, guys?

Thanks,
Aakash.


On Thu, Mar 15, 2018 at 7:54 PM, Aakash Basu <aakash.spark@gmail.com<
mailto:aakash.spark@gmail.com>> wrote:
Any help on the above?

On Thu, Mar 15, 2018 at 3:53 PM, Aakash Basu <aakash.spark@gmail.com<
mailto:aakash.spark@gmail.com>> wrote:
Hi,

I progressed a bit in the above mentioned topic -

1) I am feeding a CSV file into the Kafka topic.
2) Feeding the Kafka topic as readStream as TD's article suggests.
3) Then, simply trying to do a show on the streaming dataframe, using
queryName('XYZ') in the writeStream and writing a sql query on top of it,
but that doesn't show anything.
4) Once all the above problems are resolved, I want to perform a
stream-stream join.

The CSV file I'm in

Re: Multiple Kafka Spark Streaming Dataframe Join query

2018-03-15 Thread Tathagata Das
Chris identified the problem correctly. You need to parse out the json text
from Kafka into separate columns before you can join them up.
I walk through an example of this in my slides -
https://www.slideshare.net/databricks/easy-scalable-fault-tolerant-stream-processing-with-structured-streaming-with-tathagata-das


On Thu, Mar 15, 2018 at 8:37 AM, Bowden, Chris <chris.bow...@microfocus.com>
wrote:

> You need to tell Spark about the structure of the data, it doesn't know
> ahead of time if you put avro, json, protobuf, etc. in kafka for the
> message format. If the messages are in json, Spark provides from_json out
> of the box. For a very simple POC you can happily cast the value to a
> string, etc. if you are prototyping and pushing messages by hand with a
> console producer on the kafka side.
>
> 
> From: Aakash Basu <aakash.spark@gmail.com>
> Sent: Thursday, March 15, 2018 7:52:28 AM
> To: Tathagata Das
> Cc: Dylan Guedes; Georg Heiler; user
> Subject: Re: Multiple Kafka Spark Streaming Dataframe Join query
>
> Hi,
>
> And if I run this below piece of code -
>
>
> from pyspark.sql import SparkSession
> import time
>
> class test:
>
>
> spark = SparkSession.builder \
> .appName("DirectKafka_Spark_Stream_Stream_Join") \
> .getOrCreate()
> # ssc = StreamingContext(spark, 20)
>
> table1_stream = 
> (spark.readStream.format("kafka").option("startingOffsets",
> "earliest").option("kafka.bootstrap.servers", 
> "localhost:9092").option("subscribe",
> "test1").load())
>
> table2_stream = (
> spark.readStream.format("kafka").option("startingOffsets",
> "earliest").option("kafka.bootstrap.servers",
>
> "localhost:9092").option("subscribe",
>
>  "test2").load())
>
> joined_Stream = table1_stream.join(table2_stream, "Id")
> #
> # joined_Stream.show()
>
> # query =
> table1_stream.writeStream.format("console").start().awaitTermination()
> # .queryName("table_A").format("memory")
> # spark.sql("select * from table_A").show()
> time.sleep(10)  # sleep 20 seconds
> # query.stop()
> # query
>
>
> # /home/kafka/Downloads/spark-2.2.1-bin-hadoop2.7/bin/spark-submit
> --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0
> Stream_Stream_Join.py
>
>
>
>
> I get the below error (in Spark 2.3.0) -
>
> Traceback (most recent call last):
>   File "/home/aakashbasu/PycharmProjects/AllMyRnD/
> Kafka_Spark/Stream_Stream_Join.py", line 4, in 
> class test:
>   File "/home/aakashbasu/PycharmProjects/AllMyRnD/
> Kafka_Spark/Stream_Stream_Join.py", line 19, in test
> joined_Stream = table1_stream.join(table2_stream, "Id")
>   File "/home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/python/
> lib/pyspark.zip/pyspark/sql/dataframe.py", line 931, in join
>   File "/home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/python/
> lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
>   File "/home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/python/
> lib/pyspark.zip/pyspark/sql/utils.py", line 69, in deco
> pyspark.sql.utils.AnalysisException: u'USING column `Id` cannot be
> resolved on the left side of the join. The left-side columns: [key, value,
> topic, partition, offset, timestamp, timestampType];'
>
> Seems, as per the documentation, they key and value are deserialized as
> byte arrays.
>
> I am badly stuck at this step, not many materials online, with steps to
> proceed on this, too.
>
> Any help, guys?
>
> Thanks,
> Aakash.
>
>
> On Thu, Mar 15, 2018 at 7:54 PM, Aakash Basu <aakash.spark@gmail.com<
> mailto:aakash.spark@gmail.com>> wrote:
> Any help on the above?
>
> On Thu, Mar 15, 2018 at 3:53 PM, Aakash Basu <aakash.spark@gmail.com<
> mailto:aakash.spark@gmail.com>> wrote:
> Hi,
>
> I progressed a bit in the above mentioned topic -
>
> 1) I am feeding a CSV file into the Kafka topic.
> 2) Feeding the Kafka topic as readStream as TD's article suggests.
> 3) Then, simply trying to do a show on the streaming dataframe, using
> queryName('XYZ') in the writeStream and writing a sql query on top of it,
> but that doesn't show anything.
> 4) Once all the above problems are resolved, I want to perform a
> stream-stream join.
>
> The CSV file I'm ingesting into Kafka has -
>
> id,first_name,last_name
&g

Re: Multiple Kafka Spark Streaming Dataframe Join query

2018-03-15 Thread Aakash Basu
;> # /home/kafka/Downloads/spark-2.2.1-bin-hadoop2.7/bin/spark-submit 
>> --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0 
>> Stream_Stream_Join.py
>>
>>
>> The output I'm getting (whereas I simply want to show() my dataframe) -
>>
>> +++-+-+--+--
>> --+-+
>> | key|   value|topic|partition|offset|
>> timestamp|timestampType|
>> +++-+-+--+--
>> --+-+
>> |null|[69 64 2C 66 69 7...|test1|0|  5226|2018-03-15
>> 15:48:...|0|
>> |null|[31 2C 4B 65 6C 6...|test1|0|  5227|2018-03-15
>> 15:48:...|0|
>> |null|[32 2C 4D 6F 72 7...|test1|0|  5228|2018-03-15
>> 15:48:...|0|
>> |null|[33 2C 54 6F 62 6...|test1|0|  5229|2018-03-15
>> 15:48:...|0|
>> |null|[34 2C 57 69 6C 6...|test1|0|  5230|2018-03-15
>> 15:48:...|0|
>> |null|[35 2C 52 65 67 6...|test1|0|  5231|2018-03-15
>> 15:48:...|0|
>> +++-+-+--+--
>> --+-+
>>
>> 18/03/15 15:48:07 INFO StreamExecution: Streaming query made progress: {
>>   "id" : "ca7e2862-73c6-41bf-9a6f-c79e533a2bf8",
>>   "runId" : "0758ddbd-9b1c-428b-aa52-1dd40d477d21",
>>   "name" : "table_A",
>>   "timestamp" : "2018-03-15T10:18:07.218Z",
>>   "numInputRows" : 6,
>>   "inputRowsPerSecond" : 461.53846153846155,
>>   "processedRowsPerSecond" : 14.634146341463415,
>>   "durationMs" : {
>> "addBatch" : 241,
>> "getBatch" : 15,
>> "getOffset" : 2,
>> "queryPlanning" : 2,
>> "triggerExecution" : 410,
>> "walCommit" : 135
>>   },
>>   "stateOperators" : [ ],
>>   "sources" : [ {
>> "description" : "KafkaSource[Subscribe[test1]]",
>> "startOffset" : {
>>   "test1" : {
>> "0" : 5226
>>   }
>> },
>> "endOffset" : {
>>   "test1" : {
>> "0" : 5232
>>   }
>> },
>> "numInputRows" : 6,
>> "inputRowsPerSecond" : 461.53846153846155,
>> "processedRowsPerSecond" : 14.634146341463415
>>   } ],
>>   "sink" : {
>> "description" : "org.apache.spark.sql.executio
>> n.streaming.ConsoleSink@3dfc7990"
>>   }
>> }
>>
>> P.S - If I add the below piece in the code, it doesn't print a DF of the
>> actual table.
>>
>> spark.sql("select * from table_A").show()
>>
>>
>> Any help?
>>
>>
>> Thanks,
>> Aakash.
>>
>> On Thu, Mar 15, 2018 at 10:52 AM, Aakash Basu <aakash.spark@gmail.com
>> > wrote:
>>
>>> Thanks to TD, the savior!
>>>
>>> Shall look into it.
>>>
>>> On Thu, Mar 15, 2018 at 1:04 AM, Tathagata Das <
>>> tathagata.das1...@gmail.com> wrote:
>>>
>>>> Relevant: https://databricks.com/blog/2018/03/13/introducing
>>>> -stream-stream-joins-in-apache-spark-2-3.html
>>>>
>>>> This is true stream-stream join which will automatically buffer delayed
>>>> data and appropriately join stuff with SQL join semantics. Please check it
>>>> out :)
>>>>
>>>> TD
>>>>
>>>>
>>>>
>>>> On Wed, Mar 14, 2018 at 12:07 PM, Dylan Guedes <djmggue...@gmail.com>
>>>> wrote:
>>>>
>>>>> I misread it, and thought that you question was if pyspark supports
>>>>> kafka lol. Sorry!
>>>>>
>>>>> On Wed, Mar 14, 2018 at 3:58 PM, Aakash Basu <
>>>>> aakash.spark@gmail.com> wrote:
>>>>>
>>>>>> Hey Dylan,
>>>>>>
>>>>>> Great!
>>>>>>
>>>>>> Can you revert back to my initial and also the latest mail?
>>>>>>
>>>>>> Thanks,
>>>>>> Aakash.
>>>>>>
>>>>>> On 15-Mar-2018 12:27 AM, "Dylan Guedes" <djmggue...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I've been using the Kafka with pyspark since 2.1.
>>>>>>>
>>>>>>> On Wed, Mar 14, 2018 at 3:49 PM, Aakash Basu <
>>>>>>> aakash.spark@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I'm yet to.
>>>>>>>>
>>>>>>>> Just want to know, when does Spark 2.3 with 0.10 Kafka Spark
>>>>>>>> Package allows Python? I read somewhere, as of now Scala and Java are 
>>>>>>>> the
>>>>>>>> languages to be used.
>>>>>>>>
>>>>>>>> Please correct me if am wrong.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Aakash.
>>>>>>>>
>>>>>>>> On 14-Mar-2018 8:24 PM, "Georg Heiler" <georg.kf.hei...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Did you try spark 2.3 with structured streaming? There
>>>>>>>>> watermarking and plain sql might be really interesting for you.
>>>>>>>>> Aakash Basu <aakash.spark@gmail.com> schrieb am Mi. 14. März
>>>>>>>>> 2018 um 14:57:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> *Info (Using):Spark Streaming Kafka 0.8 package*
>>>>>>>>>>
>>>>>>>>>> *Spark 2.2.1*
>>>>>>>>>> *Kafka 1.0.1*
>>>>>>>>>>
>>>>>>>>>> As of now, I am feeding paragraphs in Kafka console producer and
>>>>>>>>>> my Spark, which is acting as a receiver is printing the flattened 
>>>>>>>>>> words,
>>>>>>>>>> which is a complete RDD operation.
>>>>>>>>>>
>>>>>>>>>> *My motive is to read two tables continuously (being updated) as
>>>>>>>>>> two distinct Kafka topics being read as two Spark Dataframes and 
>>>>>>>>>> join them
>>>>>>>>>> based on a key and produce the output. *(I am from Spark-SQL
>>>>>>>>>> background, pardon my Spark-SQL-ish writing)
>>>>>>>>>>
>>>>>>>>>> *It may happen, the first topic is receiving new data 15 mins
>>>>>>>>>> prior to the second topic, in that scenario, how to proceed? I 
>>>>>>>>>> should not
>>>>>>>>>> lose any data.*
>>>>>>>>>>
>>>>>>>>>> As of now, I want to simply pass paragraphs, read them as RDD,
>>>>>>>>>> convert to DF and then join to get the common keys as the output. 
>>>>>>>>>> (Just for
>>>>>>>>>> R).
>>>>>>>>>>
>>>>>>>>>> Started using Spark Streaming and Kafka today itself.
>>>>>>>>>>
>>>>>>>>>> Please help!
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Aakash.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: Multiple Kafka Spark Streaming Dataframe Join query

2018-03-15 Thread Aakash Basu
;
> Thanks,
> Aakash.
>
> On Thu, Mar 15, 2018 at 10:52 AM, Aakash Basu <aakash.spark@gmail.com>
> wrote:
>
>> Thanks to TD, the savior!
>>
>> Shall look into it.
>>
>> On Thu, Mar 15, 2018 at 1:04 AM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> Relevant: https://databricks.com/blog/2018/03/13/introducing
>>> -stream-stream-joins-in-apache-spark-2-3.html
>>>
>>> This is true stream-stream join which will automatically buffer delayed
>>> data and appropriately join stuff with SQL join semantics. Please check it
>>> out :)
>>>
>>> TD
>>>
>>>
>>>
>>> On Wed, Mar 14, 2018 at 12:07 PM, Dylan Guedes <djmggue...@gmail.com>
>>> wrote:
>>>
>>>> I misread it, and thought that you question was if pyspark supports
>>>> kafka lol. Sorry!
>>>>
>>>> On Wed, Mar 14, 2018 at 3:58 PM, Aakash Basu <
>>>> aakash.spark....@gmail.com> wrote:
>>>>
>>>>> Hey Dylan,
>>>>>
>>>>> Great!
>>>>>
>>>>> Can you revert back to my initial and also the latest mail?
>>>>>
>>>>> Thanks,
>>>>> Aakash.
>>>>>
>>>>> On 15-Mar-2018 12:27 AM, "Dylan Guedes" <djmggue...@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I've been using the Kafka with pyspark since 2.1.
>>>>>>
>>>>>> On Wed, Mar 14, 2018 at 3:49 PM, Aakash Basu <
>>>>>> aakash.spark@gmail.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I'm yet to.
>>>>>>>
>>>>>>> Just want to know, when does Spark 2.3 with 0.10 Kafka Spark Package
>>>>>>> allows Python? I read somewhere, as of now Scala and Java are the 
>>>>>>> languages
>>>>>>> to be used.
>>>>>>>
>>>>>>> Please correct me if am wrong.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Aakash.
>>>>>>>
>>>>>>> On 14-Mar-2018 8:24 PM, "Georg Heiler" <georg.kf.hei...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Did you try spark 2.3 with structured streaming? There watermarking
>>>>>>>> and plain sql might be really interesting for you.
>>>>>>>> Aakash Basu <aakash.spark@gmail.com> schrieb am Mi. 14. März
>>>>>>>> 2018 um 14:57:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> *Info (Using):Spark Streaming Kafka 0.8 package*
>>>>>>>>>
>>>>>>>>> *Spark 2.2.1*
>>>>>>>>> *Kafka 1.0.1*
>>>>>>>>>
>>>>>>>>> As of now, I am feeding paragraphs in Kafka console producer and
>>>>>>>>> my Spark, which is acting as a receiver is printing the flattened 
>>>>>>>>> words,
>>>>>>>>> which is a complete RDD operation.
>>>>>>>>>
>>>>>>>>> *My motive is to read two tables continuously (being updated) as
>>>>>>>>> two distinct Kafka topics being read as two Spark Dataframes and join 
>>>>>>>>> them
>>>>>>>>> based on a key and produce the output. *(I am from Spark-SQL
>>>>>>>>> background, pardon my Spark-SQL-ish writing)
>>>>>>>>>
>>>>>>>>> *It may happen, the first topic is receiving new data 15 mins
>>>>>>>>> prior to the second topic, in that scenario, how to proceed? I should 
>>>>>>>>> not
>>>>>>>>> lose any data.*
>>>>>>>>>
>>>>>>>>> As of now, I want to simply pass paragraphs, read them as RDD,
>>>>>>>>> convert to DF and then join to get the common keys as the output. 
>>>>>>>>> (Just for
>>>>>>>>> R).
>>>>>>>>>
>>>>>>>>> Started using Spark Streaming and Kafka today itself.
>>>>>>>>>
>>>>>>>>> Please help!
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Aakash.
>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>
>>>
>>
>


Re: Multiple Kafka Spark Streaming Dataframe Join query

2018-03-15 Thread Aakash Basu
>> I misread it, and thought that you question was if pyspark supports
>>> kafka lol. Sorry!
>>>
>>> On Wed, Mar 14, 2018 at 3:58 PM, Aakash Basu <aakash.spark@gmail.com
>>> > wrote:
>>>
>>>> Hey Dylan,
>>>>
>>>> Great!
>>>>
>>>> Can you revert back to my initial and also the latest mail?
>>>>
>>>> Thanks,
>>>> Aakash.
>>>>
>>>> On 15-Mar-2018 12:27 AM, "Dylan Guedes" <djmggue...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I've been using the Kafka with pyspark since 2.1.
>>>>>
>>>>> On Wed, Mar 14, 2018 at 3:49 PM, Aakash Basu <
>>>>> aakash.spark@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I'm yet to.
>>>>>>
>>>>>> Just want to know, when does Spark 2.3 with 0.10 Kafka Spark Package
>>>>>> allows Python? I read somewhere, as of now Scala and Java are the 
>>>>>> languages
>>>>>> to be used.
>>>>>>
>>>>>> Please correct me if am wrong.
>>>>>>
>>>>>> Thanks,
>>>>>> Aakash.
>>>>>>
>>>>>> On 14-Mar-2018 8:24 PM, "Georg Heiler" <georg.kf.hei...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Did you try spark 2.3 with structured streaming? There watermarking
>>>>>>> and plain sql might be really interesting for you.
>>>>>>> Aakash Basu <aakash.spark@gmail.com> schrieb am Mi. 14. März
>>>>>>> 2018 um 14:57:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> *Info (Using):Spark Streaming Kafka 0.8 package*
>>>>>>>>
>>>>>>>> *Spark 2.2.1*
>>>>>>>> *Kafka 1.0.1*
>>>>>>>>
>>>>>>>> As of now, I am feeding paragraphs in Kafka console producer and my
>>>>>>>> Spark, which is acting as a receiver is printing the flattened words, 
>>>>>>>> which
>>>>>>>> is a complete RDD operation.
>>>>>>>>
>>>>>>>> *My motive is to read two tables continuously (being updated) as
>>>>>>>> two distinct Kafka topics being read as two Spark Dataframes and join 
>>>>>>>> them
>>>>>>>> based on a key and produce the output. *(I am from Spark-SQL
>>>>>>>> background, pardon my Spark-SQL-ish writing)
>>>>>>>>
>>>>>>>> *It may happen, the first topic is receiving new data 15 mins prior
>>>>>>>> to the second topic, in that scenario, how to proceed? I should not 
>>>>>>>> lose
>>>>>>>> any data.*
>>>>>>>>
>>>>>>>> As of now, I want to simply pass paragraphs, read them as RDD,
>>>>>>>> convert to DF and then join to get the common keys as the output. 
>>>>>>>> (Just for
>>>>>>>> R).
>>>>>>>>
>>>>>>>> Started using Spark Streaming and Kafka today itself.
>>>>>>>>
>>>>>>>> Please help!
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Aakash.
>>>>>>>>
>>>>>>>
>>>>>
>>>
>>
>


Re: Multiple Kafka Spark Streaming Dataframe Join query

2018-03-14 Thread Aakash Basu
Thanks to TD, the savior!

Shall look into it.

On Thu, Mar 15, 2018 at 1:04 AM, Tathagata Das <tathagata.das1...@gmail.com>
wrote:

> Relevant: https://databricks.com/blog/2018/03/13/
> introducing-stream-stream-joins-in-apache-spark-2-3.html
>
> This is true stream-stream join which will automatically buffer delayed
> data and appropriately join stuff with SQL join semantics. Please check it
> out :)
>
> TD
>
>
>
> On Wed, Mar 14, 2018 at 12:07 PM, Dylan Guedes <djmggue...@gmail.com>
> wrote:
>
>> I misread it, and thought that you question was if pyspark supports kafka
>> lol. Sorry!
>>
>> On Wed, Mar 14, 2018 at 3:58 PM, Aakash Basu <aakash.spark@gmail.com>
>> wrote:
>>
>>> Hey Dylan,
>>>
>>> Great!
>>>
>>> Can you revert back to my initial and also the latest mail?
>>>
>>> Thanks,
>>> Aakash.
>>>
>>> On 15-Mar-2018 12:27 AM, "Dylan Guedes" <djmggue...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I've been using the Kafka with pyspark since 2.1.
>>>>
>>>> On Wed, Mar 14, 2018 at 3:49 PM, Aakash Basu <
>>>> aakash.spark@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I'm yet to.
>>>>>
>>>>> Just want to know, when does Spark 2.3 with 0.10 Kafka Spark Package
>>>>> allows Python? I read somewhere, as of now Scala and Java are the 
>>>>> languages
>>>>> to be used.
>>>>>
>>>>> Please correct me if am wrong.
>>>>>
>>>>> Thanks,
>>>>> Aakash.
>>>>>
>>>>> On 14-Mar-2018 8:24 PM, "Georg Heiler" <georg.kf.hei...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Did you try spark 2.3 with structured streaming? There watermarking
>>>>>> and plain sql might be really interesting for you.
>>>>>> Aakash Basu <aakash.spark@gmail.com> schrieb am Mi. 14. März
>>>>>> 2018 um 14:57:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *Info (Using):Spark Streaming Kafka 0.8 package*
>>>>>>>
>>>>>>> *Spark 2.2.1*
>>>>>>> *Kafka 1.0.1*
>>>>>>>
>>>>>>> As of now, I am feeding paragraphs in Kafka console producer and my
>>>>>>> Spark, which is acting as a receiver is printing the flattened words, 
>>>>>>> which
>>>>>>> is a complete RDD operation.
>>>>>>>
>>>>>>> *My motive is to read two tables continuously (being updated) as two
>>>>>>> distinct Kafka topics being read as two Spark Dataframes and join them
>>>>>>> based on a key and produce the output. *(I am from Spark-SQL
>>>>>>> background, pardon my Spark-SQL-ish writing)
>>>>>>>
>>>>>>> *It may happen, the first topic is receiving new data 15 mins prior
>>>>>>> to the second topic, in that scenario, how to proceed? I should not lose
>>>>>>> any data.*
>>>>>>>
>>>>>>> As of now, I want to simply pass paragraphs, read them as RDD,
>>>>>>> convert to DF and then join to get the common keys as the output. (Just 
>>>>>>> for
>>>>>>> R).
>>>>>>>
>>>>>>> Started using Spark Streaming and Kafka today itself.
>>>>>>>
>>>>>>> Please help!
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Aakash.
>>>>>>>
>>>>>>
>>>>
>>
>


Re: How to start practicing Python Spark Streaming in Linux?

2018-03-14 Thread Felix Cheung
It’s best to start with Structured Streaming

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#tab_python_0

https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#tab_python_0

_
From: Aakash Basu <aakash.spark@gmail.com>
Sent: Wednesday, March 14, 2018 1:09 AM
Subject: How to start practicing Python Spark Streaming in Linux?
To: user <user@spark.apache.org>


Hi all,

Any guide on how to kich-start learning PySpark Streaming in ubuntu standalone 
system? Step wise, practical hands-on, would be great.

Also, connecting Kafka with Spark and getting real time data and processing it 
in micro-batches...

Any help?

Thanks,
Aakash.




Re: Multiple Kafka Spark Streaming Dataframe Join query

2018-03-14 Thread Tathagata Das
Relevant:
https://databricks.com/blog/2018/03/13/introducing-stream-stream-joins-in-apache-spark-2-3.html


This is true stream-stream join which will automatically buffer delayed
data and appropriately join stuff with SQL join semantics. Please check it
out :)

TD



On Wed, Mar 14, 2018 at 12:07 PM, Dylan Guedes <djmggue...@gmail.com> wrote:

> I misread it, and thought that you question was if pyspark supports kafka
> lol. Sorry!
>
> On Wed, Mar 14, 2018 at 3:58 PM, Aakash Basu <aakash.spark@gmail.com>
> wrote:
>
>> Hey Dylan,
>>
>> Great!
>>
>> Can you revert back to my initial and also the latest mail?
>>
>> Thanks,
>> Aakash.
>>
>> On 15-Mar-2018 12:27 AM, "Dylan Guedes" <djmggue...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I've been using the Kafka with pyspark since 2.1.
>>>
>>> On Wed, Mar 14, 2018 at 3:49 PM, Aakash Basu <aakash.spark@gmail.com
>>> > wrote:
>>>
>>>> Hi,
>>>>
>>>> I'm yet to.
>>>>
>>>> Just want to know, when does Spark 2.3 with 0.10 Kafka Spark Package
>>>> allows Python? I read somewhere, as of now Scala and Java are the languages
>>>> to be used.
>>>>
>>>> Please correct me if am wrong.
>>>>
>>>> Thanks,
>>>> Aakash.
>>>>
>>>> On 14-Mar-2018 8:24 PM, "Georg Heiler" <georg.kf.hei...@gmail.com>
>>>> wrote:
>>>>
>>>>> Did you try spark 2.3 with structured streaming? There watermarking
>>>>> and plain sql might be really interesting for you.
>>>>> Aakash Basu <aakash.spark@gmail.com> schrieb am Mi. 14. März 2018
>>>>> um 14:57:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>>
>>>>>>
>>>>>> *Info (Using):Spark Streaming Kafka 0.8 package*
>>>>>>
>>>>>> *Spark 2.2.1*
>>>>>> *Kafka 1.0.1*
>>>>>>
>>>>>> As of now, I am feeding paragraphs in Kafka console producer and my
>>>>>> Spark, which is acting as a receiver is printing the flattened words, 
>>>>>> which
>>>>>> is a complete RDD operation.
>>>>>>
>>>>>> *My motive is to read two tables continuously (being updated) as two
>>>>>> distinct Kafka topics being read as two Spark Dataframes and join them
>>>>>> based on a key and produce the output. *(I am from Spark-SQL
>>>>>> background, pardon my Spark-SQL-ish writing)
>>>>>>
>>>>>> *It may happen, the first topic is receiving new data 15 mins prior
>>>>>> to the second topic, in that scenario, how to proceed? I should not lose
>>>>>> any data.*
>>>>>>
>>>>>> As of now, I want to simply pass paragraphs, read them as RDD,
>>>>>> convert to DF and then join to get the common keys as the output. (Just 
>>>>>> for
>>>>>> R).
>>>>>>
>>>>>> Started using Spark Streaming and Kafka today itself.
>>>>>>
>>>>>> Please help!
>>>>>>
>>>>>> Thanks,
>>>>>> Aakash.
>>>>>>
>>>>>
>>>
>


Re: Multiple Kafka Spark Streaming Dataframe Join query

2018-03-14 Thread Dylan Guedes
I misread it, and thought that you question was if pyspark supports kafka
lol. Sorry!

On Wed, Mar 14, 2018 at 3:58 PM, Aakash Basu <aakash.spark@gmail.com>
wrote:

> Hey Dylan,
>
> Great!
>
> Can you revert back to my initial and also the latest mail?
>
> Thanks,
> Aakash.
>
> On 15-Mar-2018 12:27 AM, "Dylan Guedes" <djmggue...@gmail.com> wrote:
>
>> Hi,
>>
>> I've been using the Kafka with pyspark since 2.1.
>>
>> On Wed, Mar 14, 2018 at 3:49 PM, Aakash Basu <aakash.spark@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I'm yet to.
>>>
>>> Just want to know, when does Spark 2.3 with 0.10 Kafka Spark Package
>>> allows Python? I read somewhere, as of now Scala and Java are the languages
>>> to be used.
>>>
>>> Please correct me if am wrong.
>>>
>>> Thanks,
>>> Aakash.
>>>
>>> On 14-Mar-2018 8:24 PM, "Georg Heiler" <georg.kf.hei...@gmail.com>
>>> wrote:
>>>
>>>> Did you try spark 2.3 with structured streaming? There watermarking and
>>>> plain sql might be really interesting for you.
>>>> Aakash Basu <aakash.spark@gmail.com> schrieb am Mi. 14. März 2018
>>>> um 14:57:
>>>>
>>>>> Hi,
>>>>>
>>>>>
>>>>>
>>>>> *Info (Using):Spark Streaming Kafka 0.8 package*
>>>>>
>>>>> *Spark 2.2.1*
>>>>> *Kafka 1.0.1*
>>>>>
>>>>> As of now, I am feeding paragraphs in Kafka console producer and my
>>>>> Spark, which is acting as a receiver is printing the flattened words, 
>>>>> which
>>>>> is a complete RDD operation.
>>>>>
>>>>> *My motive is to read two tables continuously (being updated) as two
>>>>> distinct Kafka topics being read as two Spark Dataframes and join them
>>>>> based on a key and produce the output. *(I am from Spark-SQL
>>>>> background, pardon my Spark-SQL-ish writing)
>>>>>
>>>>> *It may happen, the first topic is receiving new data 15 mins prior to
>>>>> the second topic, in that scenario, how to proceed? I should not lose any
>>>>> data.*
>>>>>
>>>>> As of now, I want to simply pass paragraphs, read them as RDD, convert
>>>>> to DF and then join to get the common keys as the output. (Just for R).
>>>>>
>>>>> Started using Spark Streaming and Kafka today itself.
>>>>>
>>>>> Please help!
>>>>>
>>>>> Thanks,
>>>>> Aakash.
>>>>>
>>>>
>>


Re: Multiple Kafka Spark Streaming Dataframe Join query

2018-03-14 Thread Aakash Basu
Hey Dylan,

Great!

Can you revert back to my initial and also the latest mail?

Thanks,
Aakash.

On 15-Mar-2018 12:27 AM, "Dylan Guedes" <djmggue...@gmail.com> wrote:

> Hi,
>
> I've been using the Kafka with pyspark since 2.1.
>
> On Wed, Mar 14, 2018 at 3:49 PM, Aakash Basu <aakash.spark@gmail.com>
> wrote:
>
>> Hi,
>>
>> I'm yet to.
>>
>> Just want to know, when does Spark 2.3 with 0.10 Kafka Spark Package
>> allows Python? I read somewhere, as of now Scala and Java are the languages
>> to be used.
>>
>> Please correct me if am wrong.
>>
>> Thanks,
>> Aakash.
>>
>> On 14-Mar-2018 8:24 PM, "Georg Heiler" <georg.kf.hei...@gmail.com> wrote:
>>
>>> Did you try spark 2.3 with structured streaming? There watermarking and
>>> plain sql might be really interesting for you.
>>> Aakash Basu <aakash.spark@gmail.com> schrieb am Mi. 14. März 2018
>>> um 14:57:
>>>
>>>> Hi,
>>>>
>>>>
>>>>
>>>> *Info (Using):Spark Streaming Kafka 0.8 package*
>>>>
>>>> *Spark 2.2.1*
>>>> *Kafka 1.0.1*
>>>>
>>>> As of now, I am feeding paragraphs in Kafka console producer and my
>>>> Spark, which is acting as a receiver is printing the flattened words, which
>>>> is a complete RDD operation.
>>>>
>>>> *My motive is to read two tables continuously (being updated) as two
>>>> distinct Kafka topics being read as two Spark Dataframes and join them
>>>> based on a key and produce the output. *(I am from Spark-SQL
>>>> background, pardon my Spark-SQL-ish writing)
>>>>
>>>> *It may happen, the first topic is receiving new data 15 mins prior to
>>>> the second topic, in that scenario, how to proceed? I should not lose any
>>>> data.*
>>>>
>>>> As of now, I want to simply pass paragraphs, read them as RDD, convert
>>>> to DF and then join to get the common keys as the output. (Just for R).
>>>>
>>>> Started using Spark Streaming and Kafka today itself.
>>>>
>>>> Please help!
>>>>
>>>> Thanks,
>>>> Aakash.
>>>>
>>>
>


Re: Multiple Kafka Spark Streaming Dataframe Join query

2018-03-14 Thread Dylan Guedes
Hi,

I've been using the Kafka with pyspark since 2.1.

On Wed, Mar 14, 2018 at 3:49 PM, Aakash Basu <aakash.spark@gmail.com>
wrote:

> Hi,
>
> I'm yet to.
>
> Just want to know, when does Spark 2.3 with 0.10 Kafka Spark Package
> allows Python? I read somewhere, as of now Scala and Java are the languages
> to be used.
>
> Please correct me if am wrong.
>
> Thanks,
> Aakash.
>
> On 14-Mar-2018 8:24 PM, "Georg Heiler" <georg.kf.hei...@gmail.com> wrote:
>
>> Did you try spark 2.3 with structured streaming? There watermarking and
>> plain sql might be really interesting for you.
>> Aakash Basu <aakash.spark@gmail.com> schrieb am Mi. 14. März 2018 um
>> 14:57:
>>
>>> Hi,
>>>
>>>
>>>
>>> *Info (Using):Spark Streaming Kafka 0.8 package*
>>>
>>> *Spark 2.2.1*
>>> *Kafka 1.0.1*
>>>
>>> As of now, I am feeding paragraphs in Kafka console producer and my
>>> Spark, which is acting as a receiver is printing the flattened words, which
>>> is a complete RDD operation.
>>>
>>> *My motive is to read two tables continuously (being updated) as two
>>> distinct Kafka topics being read as two Spark Dataframes and join them
>>> based on a key and produce the output. *(I am from Spark-SQL
>>> background, pardon my Spark-SQL-ish writing)
>>>
>>> *It may happen, the first topic is receiving new data 15 mins prior to
>>> the second topic, in that scenario, how to proceed? I should not lose any
>>> data.*
>>>
>>> As of now, I want to simply pass paragraphs, read them as RDD, convert
>>> to DF and then join to get the common keys as the output. (Just for R).
>>>
>>> Started using Spark Streaming and Kafka today itself.
>>>
>>> Please help!
>>>
>>> Thanks,
>>> Aakash.
>>>
>>


Re: Multiple Kafka Spark Streaming Dataframe Join query

2018-03-14 Thread Aakash Basu
Hi,

I'm yet to.

Just want to know, when does Spark 2.3 with 0.10 Kafka Spark Package allows
Python? I read somewhere, as of now Scala and Java are the languages to be
used.

Please correct me if am wrong.

Thanks,
Aakash.

On 14-Mar-2018 8:24 PM, "Georg Heiler" <georg.kf.hei...@gmail.com> wrote:

> Did you try spark 2.3 with structured streaming? There watermarking and
> plain sql might be really interesting for you.
> Aakash Basu <aakash.spark@gmail.com> schrieb am Mi. 14. März 2018 um
> 14:57:
>
>> Hi,
>>
>>
>>
>> *Info (Using):Spark Streaming Kafka 0.8 package*
>>
>> *Spark 2.2.1*
>> *Kafka 1.0.1*
>>
>> As of now, I am feeding paragraphs in Kafka console producer and my
>> Spark, which is acting as a receiver is printing the flattened words, which
>> is a complete RDD operation.
>>
>> *My motive is to read two tables continuously (being updated) as two
>> distinct Kafka topics being read as two Spark Dataframes and join them
>> based on a key and produce the output. *(I am from Spark-SQL background,
>> pardon my Spark-SQL-ish writing)
>>
>> *It may happen, the first topic is receiving new data 15 mins prior to
>> the second topic, in that scenario, how to proceed? I should not lose any
>> data.*
>>
>> As of now, I want to simply pass paragraphs, read them as RDD, convert to
>> DF and then join to get the common keys as the output. (Just for R).
>>
>> Started using Spark Streaming and Kafka today itself.
>>
>> Please help!
>>
>> Thanks,
>> Aakash.
>>
>


Re: Multiple Kafka Spark Streaming Dataframe Join query

2018-03-14 Thread Georg Heiler
Did you try spark 2.3 with structured streaming? There watermarking and
plain sql might be really interesting for you.
Aakash Basu <aakash.spark@gmail.com> schrieb am Mi. 14. März 2018 um
14:57:

> Hi,
>
>
>
> *Info (Using):Spark Streaming Kafka 0.8 package*
>
> *Spark 2.2.1*
> *Kafka 1.0.1*
>
> As of now, I am feeding paragraphs in Kafka console producer and my Spark,
> which is acting as a receiver is printing the flattened words, which is a
> complete RDD operation.
>
> *My motive is to read two tables continuously (being updated) as two
> distinct Kafka topics being read as two Spark Dataframes and join them
> based on a key and produce the output. *(I am from Spark-SQL background,
> pardon my Spark-SQL-ish writing)
>
> *It may happen, the first topic is receiving new data 15 mins prior to the
> second topic, in that scenario, how to proceed? I should not lose any data.*
>
> As of now, I want to simply pass paragraphs, read them as RDD, convert to
> DF and then join to get the common keys as the output. (Just for R).
>
> Started using Spark Streaming and Kafka today itself.
>
> Please help!
>
> Thanks,
> Aakash.
>


Multiple Kafka Spark Streaming Dataframe Join query

2018-03-14 Thread Aakash Basu
Hi,



*Info (Using):Spark Streaming Kafka 0.8 package*

*Spark 2.2.1*
*Kafka 1.0.1*

As of now, I am feeding paragraphs in Kafka console producer and my Spark,
which is acting as a receiver is printing the flattened words, which is a
complete RDD operation.

*My motive is to read two tables continuously (being updated) as two
distinct Kafka topics being read as two Spark Dataframes and join them
based on a key and produce the output. *(I am from Spark-SQL background,
pardon my Spark-SQL-ish writing)

*It may happen, the first topic is receiving new data 15 mins prior to the
second topic, in that scenario, how to proceed? I should not lose any data.*

As of now, I want to simply pass paragraphs, read them as RDD, convert to
DF and then join to get the common keys as the output. (Just for R).

Started using Spark Streaming and Kafka today itself.

Please help!

Thanks,
Aakash.


How to start practicing Python Spark Streaming in Linux?

2018-03-14 Thread Aakash Basu
Hi all,

Any guide on how to kich-start learning PySpark Streaming in ubuntu
standalone system? Step wise, practical hands-on, would be great.

Also, connecting Kafka with Spark and getting real time data and processing
it in micro-batches...

Any help?

Thanks,
Aakash.


Spark Streaming logging on Yarn : issue with rolling in yarn-client mode for driver log

2018-03-07 Thread chandan prakash
Hi All,
I am running my spark streaming in yarn-client mode.
I want to enable rolling and aggregation  in node manager container.
I am using configs as suggested in spark doc
<https://spark.apache.org/docs/latest/running-on-yarn.html#debugging-your-application>:
${spark.yarn.app.container.log.dir}/spark.log  in log4j.properties

Also for Aggregation on yarn I have enabled these properties :
spark.yarn.rolledLog.includePattern=spark*
yarn.nodemanager.log-aggregation.roll-monitoring-interval-seconds=3600
 on spark and yarn side respectively.

At executors, my logs are getting rolled up and aggregated after every 1
hour as expected.
*But the issue is:*
 for driver,  in yarn-client mode, ${spark.yarn.app.container.log.dir} value
is not available when driver starts and so for driver ,so I am not able to
see driver logs in yarn app container directory.
My restrictions are:
1. want to use yarn-client mode only
2. want to enable logging in yarn container only so that it is aggregated
and backed up by yarn every hour to hdfs/s3

*How can I get a workaround this to enable driver logs rolling and
aggregation as well?*

Any pointers will be helpful.
thanks in advance.

-- 
Chandan Prakash


Spark Streaming reading many topics with Avro

2018-03-02 Thread Guillermo Ortiz
Hello,

I want to read with a single Spark Streaming process several topics. I'm
using avro and the data to the different topics have a different
schema.Ideally, If I would only have one topic I could implement a
deserializer but, I don't know if it's possible with many different schemas.

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092,anotherhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "use_a_separate_group_id_for_each_stream",
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean))



I can only set an value.deserializer and even if I could set many of them,
I don't know how the process is going to pick the right one.  Any idea?, I
guess I could use ByteDeserializer and do it for myself too...


Re: [Spark Streaming]: Non-deterministic uneven task-to-machine assignment

2018-02-23 Thread vijay.bvp
thanks for adding RDD lineage graph.
I could see 18 parallel tasks for HDFS Read was it changed. 


what is the spark job configuration, how many executors and cores per
exeuctor

i would say keep the partitioning multiple of  (no of executors * cores) for
all the RDD's

if you have 3 executors with 3 cores assigned for the job, 9 parallel tasks
are posible
set repartitioning on rdd;s to multiple of 9 

spark.read.parquet().repartition(27)
kafka.createDStream().repartition(27)

coalesce with shuff=false will actually causes problem with upstream
parallelism. 

please test the above scenario and share the findings.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Consuming Data in Parallel using Spark Streaming

2018-02-22 Thread naresh Goud
Here is my understanding, hope this gives some idea to understand how it
works. It might be wrong also, please excuse if it’s . I am trying to
derivating execution model with my understanding. Sorry it’s long email.

driver will keep polling Kafka for latest offset of each topic and then it
schedule jobs with offsets pulled from topic meta data.

Here job(processing logic1+  Logic2 + logic3).
These logics will be executed sequential only as defined in your
application code in executor.

Whenever job get started it will be started in one transaction which
includes following activities
Transaction
{
Get data from Kafka.
Execute logic1 -> logic2 -> logic3
Update processed record offset information
}

Having said that, coming to your approach mentioned for parallel processing
 If you pass three topics to single create Dstream spark will poll once to
get offsets of all topics instead of three poll if you create with with
different createDStream.

With the above mentioned approach of execution job is scheduled as below.
Job{
   Logic1 with offsets
Logic2 with its topic offsets
   Logic 3 with its offsets
}

With this approach also it executing logics in sequential.

Lets come to your last point of differentiate data by somehow and I am
assuming your application logic as below and  schedules job would look like
this

Job{
If(topic1 record){execute logic1)
If(topic2 record ) {execute logic2}
If(topic3 record) {execute logic3}
}

This is also leads to sequential execution.


distributed system are not designed  to execute parts of  job in parallel,
instead it will execute whole job across partitions of data in parallel.

To summarize it will be possible to parallelism is possible within each
topic processing not across processing different topics. Assume if you have
partition for a topic 3, then there would be 3 executors run parallel
executing job.





On Thu, Feb 22, 2018 at 9:44 PM Vibhakar, Beejal <
beejal.vibha...@fisglobal.com> wrote:

> Naresh – Thanks for taking out time to respond.
>
>
>
> So is it right to say that it’s the Driver program which at every 30
> seconds tells the executors (Which manage the Streams) to run rather than
> each executor making that decision themselves? And this really makes it
> sequential execution in my case?
>
>
>
> BTW, do you think following would be more suitable way to run this in
> parallel?
>
>
>
>- Right now I am creating 3 DataStream, one for each entity using
>KafkaUtils.createDirectStream API
>- While creating each DataStream, I pass on a single Kafka topic
>- Instead of creating 3 DataStream if I create a single DataStream and
>pass on multiple Kafka topics (TOPIC1, TOPIC2, TOPIC3)  to it, it should be
>able to parallelize the processing (We just need to allocate right number
>of executors)
>- To have separate processing logic for each entity, I just need some
>way to differentiate records of one type of entity from other type of
>entities.
>
>
>
> -Beejal
>
>
>
> *From:* naresh Goud [mailto:nareshgoud.du...@gmail.com]
> *Sent:* Friday, February 23, 2018 8:56 AM
> *To:* Vibhakar, Beejal <beejal.vibha...@fisglobal.com>
> *Subject:* Re: Consuming Data in Parallel using Spark Streaming
>
>
>
> You will have the same behavior both in local and hadoop cluster.
>
> since there will be only one stream context in driver which runs in Single
> JVM).
>
>
>
> On Wed, Feb 21, 2018 at 9:12 PM, Vibhakar, Beejal <
> beejal.vibha...@fisglobal.com> wrote:
>
> I am trying to process data from 3 different Kafka topics using 3
> InputDStream with a single StreamingContext. I am currently testing this
> under Sandbox where I see data processed from one Kafka topic followed by
> other.
>
>
>
> *Question#1:* I want to understand that when I run this program in Hadoop
> cluster, will it process the data in parallel from 3 Kafka topics OR will I
> see the same behavior as I see in my Sandbox?
>
>
>
> *Question#2:* I aim to process the data from all three Kafka topics in
> parallel.  Can I achieve this without breaking this program into 3 separate
> smaller programs?
>
>
>
> Here’s how the code template looks like..
>
>
>
>*val* ssc = *new* StreamingContext(sc, 30)
>
>
>
> *val topic1 = Array(“TOPIC1”)*
>
>
>
>*val* dataStreamTopic1 = KafkaUtils.createDirectStream[Array[Byte],
> GenericRecord](
>
>   ssc,
>
>   PreferConsistent,
>
>   Subscribe[Array[Byte], GenericRecord](*topic1*, kafkaParms))
>
>
>
>  // Processing logic for dataStreamTopic1
>
>
>
>
>
> *val topic2 = Array(“TOPIC2”)*
>
>
>
>*val* dataStreamTopic2 = KafkaUtils.createDirectStream[Array[Byte],
&g

RE: Consuming Data in Parallel using Spark Streaming

2018-02-22 Thread Vibhakar, Beejal
Naresh – Thanks for taking out time to respond.

So is it right to say that it’s the Driver program which at every 30 seconds 
tells the executors (Which manage the Streams) to run rather than each executor 
making that decision themselves? And this really makes it sequential execution 
in my case?

BTW, do you think following would be more suitable way to run this in parallel?


  *   Right now I am creating 3 DataStream, one for each entity using 
KafkaUtils.createDirectStream API
  *   While creating each DataStream, I pass on a single Kafka topic
  *   Instead of creating 3 DataStream if I create a single DataStream and pass 
on multiple Kafka topics (TOPIC1, TOPIC2, TOPIC3)  to it, it should be able to 
parallelize the processing (We just need to allocate right number of executors)
  *   To have separate processing logic for each entity, I just need some way 
to differentiate records of one type of entity from other type of entities.

-Beejal

From: naresh Goud [mailto:nareshgoud.du...@gmail.com]
Sent: Friday, February 23, 2018 8:56 AM
To: Vibhakar, Beejal <beejal.vibha...@fisglobal.com>
Subject: Re: Consuming Data in Parallel using Spark Streaming

You will have the same behavior both in local and hadoop cluster.
since there will be only one stream context in driver which runs in Single JVM).

On Wed, Feb 21, 2018 at 9:12 PM, Vibhakar, Beejal 
<beejal.vibha...@fisglobal.com<mailto:beejal.vibha...@fisglobal.com>> wrote:
I am trying to process data from 3 different Kafka topics using 3 InputDStream 
with a single StreamingContext. I am currently testing this under Sandbox where 
I see data processed from one Kafka topic followed by other.

Question#1: I want to understand that when I run this program in Hadoop 
cluster, will it process the data in parallel from 3 Kafka topics OR will I see 
the same behavior as I see in my Sandbox?

Question#2: I aim to process the data from all three Kafka topics in parallel.  
Can I achieve this without breaking this program into 3 separate smaller 
programs?

Here’s how the code template looks like..

   val ssc = new StreamingContext(sc, 30)

val topic1 = Array(“TOPIC1”)

   val dataStreamTopic1 = KafkaUtils.createDirectStream[Array[Byte], 
GenericRecord](
  ssc,
  PreferConsistent,
  Subscribe[Array[Byte], GenericRecord](topic1, kafkaParms))

 // Processing logic for dataStreamTopic1


val topic2 = Array(“TOPIC2”)

   val dataStreamTopic2 = KafkaUtils.createDirectStream[Array[Byte], 
GenericRecord](
  ssc,
  PreferConsistent,
  Subscribe[Array[Byte], GenericRecord](topic2, kafkaParms))

 // Processing logic for dataStreamTopic2


val topic3 = Array(“TOPIC3”)

   val dataStreamTopic3 = KafkaUtils.createDirectStream[Array[Byte], 
GenericRecord](
  ssc,
  PreferConsistent,
  Subscribe[Array[Byte], GenericRecord](topic3, kafkaParms))

 // Processing logic for dataStreamTopic3

// Start the Streaming
ssc.start()
ssc.awaitTermination()

Here’s how I submit my spark job on my sandbox…

./bin/spark-submit --class  --master local[*] 

Thanks,
Beejal


The information contained in this message is proprietary and/or confidential. 
If you are not the intended recipient, please: (i) delete the message and all 
copies; (ii) do not disclose, distribute or use the message in any manner; and 
(iii) notify the sender immediately. In addition, please be aware that any 
message addressed to our domain is subject to archiving and review by persons 
other than the intended recipient. Thank you.

The information contained in this message is proprietary and/or confidential. 
If you are not the intended recipient, please: (i) delete the message and all 
copies; (ii) do not disclose, distribute or use the message in any manner; and 
(iii) notify the sender immediately. In addition, please be aware that any 
message addressed to our domain is subject to archiving and review by persons 
other than the intended recipient. Thank you.


Consuming Data in Parallel using Spark Streaming

2018-02-21 Thread Vibhakar, Beejal
I am trying to process data from 3 different Kafka topics using 3 InputDStream 
with a single StreamingContext. I am currently testing this under Sandbox where 
I see data processed from one Kafka topic followed by other.

Question#1: I want to understand that when I run this program in Hadoop 
cluster, will it process the data in parallel from 3 Kafka topics OR will I see 
the same behavior as I see in my Sandbox?

Question#2: I aim to process the data from all three Kafka topics in parallel.  
Can I achieve this without breaking this program into 3 separate smaller 
programs?

Here's how the code template looks like..

   val ssc = new StreamingContext(sc, 30)

val topic1 = Array("TOPIC1")

   val dataStreamTopic1 = KafkaUtils.createDirectStream[Array[Byte], 
GenericRecord](
  ssc,
  PreferConsistent,
  Subscribe[Array[Byte], GenericRecord](topic1, kafkaParms))

 // Processing logic for dataStreamTopic1


val topic2 = Array("TOPIC2")

   val dataStreamTopic2 = KafkaUtils.createDirectStream[Array[Byte], 
GenericRecord](
  ssc,
  PreferConsistent,
  Subscribe[Array[Byte], GenericRecord](topic2, kafkaParms))

 // Processing logic for dataStreamTopic2


val topic3 = Array("TOPIC3")

   val dataStreamTopic3 = KafkaUtils.createDirectStream[Array[Byte], 
GenericRecord](
  ssc,
  PreferConsistent,
  Subscribe[Array[Byte], GenericRecord](topic3, kafkaParms))

 // Processing logic for dataStreamTopic3

// Start the Streaming
ssc.start()
ssc.awaitTermination()

Here's how I submit my spark job on my sandbox...

./bin/spark-submit --class  --master local[*] 

Thanks,
Beejal


The information contained in this message is proprietary and/or confidential. 
If you are not the intended recipient, please: (i) delete the message and all 
copies; (ii) do not disclose, distribute or use the message in any manner; and 
(iii) notify the sender immediately. In addition, please be aware that any 
message addressed to our domain is subject to archiving and review by persons 
other than the intended recipient. Thank you.


Re: [Spark Streaming]: Non-deterministic uneven task-to-machine assignment

2018-02-20 Thread LongVehicle
Hi Vijay,

Thanks for the follow-up.

The reason why we have 90 HDFS files (causing the parallelism of 90 for HDFS
read stage) is because we load the same HDFS data in different jobs, and
these jobs have parallelisms (executors X cores) of 9, 18, 30. The uneven
assignment problem that we had before could not be explained by modulo
operation/remainder, because we sometimes had only 2 executors active out of
9 (while the remaining 7 would stay completely idle).

We tried to repartition the Kafka stream to 90 partitions, but it led to
even worse disbalance in the load. Seems that keeping the number of
partitions equal to executors X cores reduces the chance of uneven
assignment.

We also tried to repartition the HDFS data to 9 partitions, but it did not
help, because repartition takes into account the initial locality of data,
so 9 partitions may end up on 9 different cores. We also tried to set
spark.shuffle.reduceLocality.enabled=false, but it did not help. Last but
not least, we want to avoid coleasce, because then partitions would depend
on the HDFS block distribution, so they would not be hash partitioned (which
we need for the join).

Please find below the relevant UI snapshots:


 
 

The snapshots refers to the batch when RDD is reloaded (WholeStageCodegen
1147 is gray except in the batch at reload time, which happens every 30
minutes).

Thanks a lot!



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Spark Streaming]: Non-deterministic uneven task-to-machine assignment

2018-02-19 Thread vijay.bvp
apologies for the long answer. 

understanding partitioning at each stage of the the RDD graph/lineage is
important for efficient parallelism and having load balanced. This applies
to working with any sources streaming or static. 
you have tricky situation here of one source kafka with 9 partitions and
static data set 90 partitions. 

before joining both these try to have number of partitions equal for both
RDD's
you can either repartition kafka source to 90 partitions or coalesce flat
file RDD to 9 partitions
or midway between 9 and 90. 

in general no of tasks that can run in parallel equal to total no of cores
spark job has (no of executors * no of cores per executor).

As an example
if the flat file has 90 partitions and if you set 4 executors each with 5
cores for a total of 20 cores if you have 20+20+20+20+10 tasks gets
scheduled. as you can see at the last you will have only 10 tasks though you
have 20 cores. 

compare this with 6 executors each with 5 cores for a total of 30 cores,
then it would be
30+30+30. 

ideally no of partitions for each RDD (in the graph lineage) should be a
multiple of total no of available cores for the spark job.

in terms of data locality prefer process-local over node-local over rack
local
as an example 
5 executors with 4 cores and 4 executors with 5 cores each of this option
will have 20 cores in total.
But with 4 executors its less shuffling more process-local/node-local

need to look at RDD graph for this
df = sqlContext.read.parquet(...)
and
RDD rdd = df.as[T].rdd


on your final question, you should be able to tune the static RDD without
external store by carefully looking at each batch RDD lineage for that 30
mins before the RDD gets refreshed again. 

if you would like to use external system Apache Ignite is something that you
can use as cache.

thanks
Vijay








--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Spark Streaming]: Non-deterministic uneven task-to-machine assignment

2018-02-19 Thread Aleksandar Vitorovic
 Hi Vijay,

Thank you very much for your reply. Setting the number of partitions
explicitly in the join, and memory pressure influence on partitioning were
definitely very good insights.

At the end, we avoid the issue of uneven load balancing completely by doing
the following two:
a) Reducing the number of executors, and increasing the number of cores and
executor memory
b) Increasing the batch interval size from 15s to 30s.

Here is a nice blog post that explains how to improve performance for Spark
jobs in general:
https://mapr.com/blog/performance-tuning-apache-kafkaspark-streaming-system/
.

@Vijay: And here are the responses to your questions:
1) Correct.

2) This is exactly what confuses us: There is nothing between the following
lines:
df = sqlContext.read.parquet(...)
and
RDD rdd = df.as[T].rdd

We saw that a separate query plan is executed on converting DataFrame to
RDD (.rdd method). Is it equivalent to repartition, coalesce or something
else?

3) Exactly.

4) We are caching the static rdd for 30 minutes. That is, we have a trait
with readLast method that returns the last read RDD, and once the RDD is
more than 30 minutes old, we reload its content from disk using  df =
sqlContext.read.parquet(...).

---

My final question is the following: What would be the most efficient way
(including possibly an external key-value store) for efficient store,
update and retrieval of final_rdd? The state may grow beyond 3GB, and we
want to maintain our scalability and latency. In fact, we have many Spark
jobs that join the same RDD with different Kafka streams.

Thank you very much!

On Wed, Jan 31, 2018 at 11:24 AM, vijay.bvp  wrote:

> Summarizing
>
> 1) Static data set read from Parquet files as DataFrame in HDFS has initial
> parallelism of 90 (based on no input files)
>
> 2) static data set DataFrame is converted as rdd, and rdd has parallelism
> of
> 18 this was not expected
> dataframe.rdd is lazy evaluation there must be some operation you were
> doing
> that would have triggered
> conversion from 90 to 18, this would be some operation that breaks
> stage/requires shuffling such as groupby, reduceby, repartition,coalesce
> if you are using coalesce, the second parameter shuff is by default false
> which means upstream parallelism is not preserved.
>
> 3) you have DStream of Kafka source with 9 partitions this is joined with
> above static data set? when joining have you tried setting up numPartitions
> an optional parameter to provide no of partitions required.
>
> 4) your batch interval is 15 seconds but you are caching the static data
> set
> for 30 minutes, what exactly you mean caching for 30 minutes?
>
> Note when you cache data based on the memory pressure there is chance that
> partitioning is not preserved.
>
> it would be useful to provide spark UI screen shots for one complete batch,
> the DAG and other details
>
> thanks
> Vijay
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark Streaming withWatermark

2018-02-06 Thread Tathagata Das
That may very well be possible. The watermark delay guarantees that any
record newer than or equal to watermark (that is, max event time seen - 20
seconds), will be considered and never be ignored.  It does not guarantee
the other way, that is, it does NOT guarantee that records older than the
watermark will definitely get ignored. In a distributed setting, it is
super hard to get strict guarantees, so we choose to err on the side of
being more inclusive (that is, include some old data), rather than the side
of dropping any not-old data.

I will update the programming guide to make this more clear.

On Feb 6, 2018 5:01 PM, "Vishnu Viswanath" 
wrote:

> Could it be that these messages were processed in the same micro batch? In
> that case, watermark will be updated only after the batch finishes which
> did not have any effect of the late data in the current batch.
>
> On Tue, Feb 6, 2018 at 4:18 PM Jiewen Shao  wrote:
>
>> Ok, Thanks for confirmation.
>>
>> So based on my code, I have messages with following timestamps (converted
>> to more readable format) in the following order:
>>
>> 2018-02-06 12:00:00
>> 2018-02-06 12:00:01
>> 2018-02-06 12:00:02
>> 2018-02-06 12:00:03
>> 2018-02-06 11:59:00  <-- this message should not be counted, right?
>> however in my test, this one is still counted
>>
>>
>>
>> On Tue, Feb 6, 2018 at 2:05 PM, Vishnu Viswanath <
>> vishnu.viswanat...@gmail.com> wrote:
>>
>>> Yes, that is correct.
>>>
>>> On Tue, Feb 6, 2018 at 4:56 PM, Jiewen Shao 
>>> wrote:
>>>
 Vishnu, thanks for the reply
 so "event time" and "window end time" have nothing to do with current
 system timestamp, watermark moves with the higher value of "timestamp"
 field of the input and never moves down, is that correct understanding?


 On Tue, Feb 6, 2018 at 11:47 AM, Vishnu Viswanath <
 vishnu.viswanat...@gmail.com> wrote:

> Hi
>
> 20 second corresponds to when the window state should be cleared. For
> the late message to be dropped, it should come in after you receive a
> message with event time >= window end time + 20 seconds.
>
> I wrote a post on this recently: http://vishnuviswanath.com/spark_
> structured_streaming.html#watermark
>
> Thanks,
> Vishnu
>
> On Tue, Feb 6, 2018 at 12:11 PM Jiewen Shao 
> wrote:
>
>> sample code:
>>
>> Let's say Xyz is POJO with a field called timestamp,
>>
>> regarding code withWatermark("timestamp", "20 seconds")
>>
>> I expect the msg with timestamp 20 seconds or older will be dropped,
>> what does 20 seconds compare to? based on my test nothing was dropped no
>> matter how old the timestamp is, what did i miss?
>>
>> Dataset xyz = lines
>> .as(Encoders.STRING())
>> .map((MapFunction) value -> mapper.readValue(value, 
>> Xyz.class), Encoders.bean(Xyz.class));
>>
>> Dataset aggregated = xyz.withWatermark("timestamp", "20 seconds")
>> .groupBy(functions.window(xyz.col("timestamp"), "5 seconds"), 
>> xyz.col("x") //tumbling window of size 5 seconds (timestamp)
>> ).count();
>>
>> Thanks
>>
>>

>>>
>>


Re: Spark Streaming withWatermark

2018-02-06 Thread Vishnu Viswanath
Could it be that these messages were processed in the same micro batch? In
that case, watermark will be updated only after the batch finishes which
did not have any effect of the late data in the current batch.

On Tue, Feb 6, 2018 at 4:18 PM Jiewen Shao  wrote:

> Ok, Thanks for confirmation.
>
> So based on my code, I have messages with following timestamps (converted
> to more readable format) in the following order:
>
> 2018-02-06 12:00:00
> 2018-02-06 12:00:01
> 2018-02-06 12:00:02
> 2018-02-06 12:00:03
> 2018-02-06 11:59:00  <-- this message should not be counted, right?
> however in my test, this one is still counted
>
>
>
> On Tue, Feb 6, 2018 at 2:05 PM, Vishnu Viswanath <
> vishnu.viswanat...@gmail.com> wrote:
>
>> Yes, that is correct.
>>
>> On Tue, Feb 6, 2018 at 4:56 PM, Jiewen Shao 
>> wrote:
>>
>>> Vishnu, thanks for the reply
>>> so "event time" and "window end time" have nothing to do with current
>>> system timestamp, watermark moves with the higher value of "timestamp"
>>> field of the input and never moves down, is that correct understanding?
>>>
>>>
>>> On Tue, Feb 6, 2018 at 11:47 AM, Vishnu Viswanath <
>>> vishnu.viswanat...@gmail.com> wrote:
>>>
 Hi

 20 second corresponds to when the window state should be cleared. For
 the late message to be dropped, it should come in after you receive a
 message with event time >= window end time + 20 seconds.

 I wrote a post on this recently:
 http://vishnuviswanath.com/spark_structured_streaming.html#watermark

 Thanks,
 Vishnu

 On Tue, Feb 6, 2018 at 12:11 PM Jiewen Shao 
 wrote:

> sample code:
>
> Let's say Xyz is POJO with a field called timestamp,
>
> regarding code withWatermark("timestamp", "20 seconds")
>
> I expect the msg with timestamp 20 seconds or older will be dropped,
> what does 20 seconds compare to? based on my test nothing was dropped no
> matter how old the timestamp is, what did i miss?
>
> Dataset xyz = lines
> .as(Encoders.STRING())
> .map((MapFunction) value -> mapper.readValue(value, 
> Xyz.class), Encoders.bean(Xyz.class));
>
> Dataset aggregated = xyz.withWatermark("timestamp", "20 seconds")
> .groupBy(functions.window(xyz.col("timestamp"), "5 seconds"), 
> xyz.col("x") //tumbling window of size 5 seconds (timestamp)
> ).count();
>
> Thanks
>
>
>>>
>>
>


Re: Spark Streaming withWatermark

2018-02-06 Thread Jiewen Shao
Ok, Thanks for confirmation.

So based on my code, I have messages with following timestamps (converted
to more readable format) in the following order:

2018-02-06 12:00:00
2018-02-06 12:00:01
2018-02-06 12:00:02
2018-02-06 12:00:03
2018-02-06 11:59:00  <-- this message should not be counted, right? however
in my test, this one is still counted



On Tue, Feb 6, 2018 at 2:05 PM, Vishnu Viswanath <
vishnu.viswanat...@gmail.com> wrote:

> Yes, that is correct.
>
> On Tue, Feb 6, 2018 at 4:56 PM, Jiewen Shao 
> wrote:
>
>> Vishnu, thanks for the reply
>> so "event time" and "window end time" have nothing to do with current
>> system timestamp, watermark moves with the higher value of "timestamp"
>> field of the input and never moves down, is that correct understanding?
>>
>>
>> On Tue, Feb 6, 2018 at 11:47 AM, Vishnu Viswanath <
>> vishnu.viswanat...@gmail.com> wrote:
>>
>>> Hi
>>>
>>> 20 second corresponds to when the window state should be cleared. For
>>> the late message to be dropped, it should come in after you receive a
>>> message with event time >= window end time + 20 seconds.
>>>
>>> I wrote a post on this recently: http://vishnuviswana
>>> th.com/spark_structured_streaming.html#watermark
>>>
>>> Thanks,
>>> Vishnu
>>>
>>> On Tue, Feb 6, 2018 at 12:11 PM Jiewen Shao 
>>> wrote:
>>>
 sample code:

 Let's say Xyz is POJO with a field called timestamp,

 regarding code withWatermark("timestamp", "20 seconds")

 I expect the msg with timestamp 20 seconds or older will be dropped,
 what does 20 seconds compare to? based on my test nothing was dropped no
 matter how old the timestamp is, what did i miss?

 Dataset xyz = lines
 .as(Encoders.STRING())
 .map((MapFunction) value -> mapper.readValue(value, 
 Xyz.class), Encoders.bean(Xyz.class));

 Dataset aggregated = xyz.withWatermark("timestamp", "20 seconds")
 .groupBy(functions.window(xyz.col("timestamp"), "5 seconds"), 
 xyz.col("x") //tumbling window of size 5 seconds (timestamp)
 ).count();

 Thanks


>>
>


Re: Spark Streaming withWatermark

2018-02-06 Thread Vishnu Viswanath
Yes, that is correct.

On Tue, Feb 6, 2018 at 4:56 PM, Jiewen Shao  wrote:

> Vishnu, thanks for the reply
> so "event time" and "window end time" have nothing to do with current
> system timestamp, watermark moves with the higher value of "timestamp"
> field of the input and never moves down, is that correct understanding?
>
>
> On Tue, Feb 6, 2018 at 11:47 AM, Vishnu Viswanath <
> vishnu.viswanat...@gmail.com> wrote:
>
>> Hi
>>
>> 20 second corresponds to when the window state should be cleared. For the
>> late message to be dropped, it should come in after you receive a message
>> with event time >= window end time + 20 seconds.
>>
>> I wrote a post on this recently: http://vishnuviswana
>> th.com/spark_structured_streaming.html#watermark
>>
>> Thanks,
>> Vishnu
>>
>> On Tue, Feb 6, 2018 at 12:11 PM Jiewen Shao 
>> wrote:
>>
>>> sample code:
>>>
>>> Let's say Xyz is POJO with a field called timestamp,
>>>
>>> regarding code withWatermark("timestamp", "20 seconds")
>>>
>>> I expect the msg with timestamp 20 seconds or older will be dropped,
>>> what does 20 seconds compare to? based on my test nothing was dropped no
>>> matter how old the timestamp is, what did i miss?
>>>
>>> Dataset xyz = lines
>>> .as(Encoders.STRING())
>>> .map((MapFunction) value -> mapper.readValue(value, 
>>> Xyz.class), Encoders.bean(Xyz.class));
>>>
>>> Dataset aggregated = xyz.withWatermark("timestamp", "20 seconds")
>>> .groupBy(functions.window(xyz.col("timestamp"), "5 seconds"), 
>>> xyz.col("x") //tumbling window of size 5 seconds (timestamp)
>>> ).count();
>>>
>>> Thanks
>>>
>>>
>


Re: Spark Streaming withWatermark

2018-02-06 Thread Vishnu Viswanath
Hi

20 second corresponds to when the window state should be cleared. For the
late message to be dropped, it should come in after you receive a message
with event time >= window end time + 20 seconds.

I wrote a post on this recently:
http://vishnuviswanath.com/spark_structured_streaming.html#watermark

Thanks,
Vishnu

On Tue, Feb 6, 2018 at 12:11 PM Jiewen Shao  wrote:

> sample code:
>
> Let's say Xyz is POJO with a field called timestamp,
>
> regarding code withWatermark("timestamp", "20 seconds")
>
> I expect the msg with timestamp 20 seconds or older will be dropped, what
> does 20 seconds compare to? based on my test nothing was dropped no matter
> how old the timestamp is, what did i miss?
>
> Dataset xyz = lines
> .as(Encoders.STRING())
> .map((MapFunction) value -> mapper.readValue(value, 
> Xyz.class), Encoders.bean(Xyz.class));
>
> Dataset aggregated = xyz.withWatermark("timestamp", "20 seconds")
> .groupBy(functions.window(xyz.col("timestamp"), "5 seconds"), 
> xyz.col("x") //tumbling window of size 5 seconds (timestamp)
> ).count();
>
> Thanks
>
>


Spark Streaming withWatermark

2018-02-06 Thread Jiewen Shao
sample code:

Let's say Xyz is POJO with a field called timestamp,

regarding code withWatermark("timestamp", "20 seconds")

I expect the msg with timestamp 20 seconds or older will be dropped, what
does 20 seconds compare to? based on my test nothing was dropped no matter
how old the timestamp is, what did i miss?

Dataset xyz = lines
.as(Encoders.STRING())
.map((MapFunction) value ->
mapper.readValue(value, Xyz.class), Encoders.bean(Xyz.class));

Dataset aggregated = xyz.withWatermark("timestamp", "20 seconds")
.groupBy(functions.window(xyz.col("timestamp"), "5 seconds"),
xyz.col("x") //tumbling window of size 5 seconds (timestamp)
).count();

Thanks


Re: Prefer Structured Streaming over Spark Streaming (DStreams)?

2018-02-02 Thread Biplob Biswas
Great to hear 2 different viewpoints, and thanks a lot for your input
Michael. For now, our application perform an etl process where it reads
data from kafka and stores it in HBase and then performs basic enhancement
and pushes data out on a kafka topic.

We have a conflict of opinion here as few people want to go with DStreams
stating that it provides the primitive rdd abstraction and functionality is
better and easier than structured streaming. We don't have any event time
requirement and also not using windowing mechanism, some basic grouping,
enhancement and storing.

Thats why the question was directed towards Structured Streaming vs
DStreams.

Also, when you say,

> Structured streaming is a completely new implementation that does not use
> DStreams at all, but instead directly runs jobs using RDDs

I understand it doesn't it doesn't use Dstream but I thought Structured
Streaming runs jobs on RDD's via dataframes and in the future, if the RDD
abstraction needs to be switched, it will be done by removing RDD with
something else. Please correct me if I understood this wrong.

Thanks & Regards
Biplob Biswas

On Thu, Feb 1, 2018 at 12:12 AM, Michael Armbrust 
wrote:

> At this point I recommend that new applications are built using structured
> streaming. The engine was GA-ed as of Spark 2.2 and I know of several very
> large (trillions of records) production jobs that are running in Structured
> Streaming.  All of our production pipelines at databricks are written using
> structured streaming as well.
>
> Regarding the comparison with RDDs: The situation here is different than
> when thinking about batch DataFrames vs. RDDs.  DataFrames are "just" a
> higher-level abstraction on RDDs.  Structured streaming is a completely new
> implementation that does not use DStreams at all, but instead directly runs
> jobs using RDDs.  The advantages over DStreams include:
>  - The ability to start and stop individual queries (rather than needing
> to start/stop a separate StreamingContext)
>  - The ability to upgrade your stream and still start from an existing
> checkpoint
>  - Support for working with Spark SQL data sources (json, parquet, etc)
>  - Higher level APIs (DataFrames and SQL) and lambda functions (Datasets)
>  - Support for event time aggregation
>
> At this point, with the addition of mapGroupsWithState and
> flatMapGroupsWithState, I think we should be at feature parity with
> DStreams (and the state store does incremental checkpoints that are more
> efficient than the DStream store).  However if there are applications you
> are having a hard time porting over, please let us know!
>
> On Wed, Jan 31, 2018 at 5:42 AM, vijay.bvp  wrote:
>
>> here is my two cents, experts please correct me if wrong
>>
>> its important to understand why one over other and for what kind of use
>> case. There might be sometime in future where low level API's are
>> abstracted
>> and become legacy but for now in Spark RDD API is the core and low level
>> API, all higher APIs translate to RDD ultimately,  and RDD's are
>> immutable.
>>
>> https://spark.apache.org/docs/latest/structured-streaming-pr
>> ogramming-guide.html#unsupported-operations
>> these are things that are not supported and this list needs to be
>> validated
>> with the use case you have.
>>
>> From my experience Structured Streaming is still new and DStreams API is a
>> matured API.
>> some things that are missing or need to explore more.
>>
>> watermarking/windowing based on no of records in a particular window
>>
>> assuming you have watermark and windowing on event time of the data,  the
>> resultant dataframe is grouped data set, only thing you can do is run
>> aggregate functions. you can't simply use that output as another dataframe
>> and manipulate. There is a custom aggregator but I feel its limited.
>>
>> https://spark.apache.org/docs/latest/structured-streaming-pr
>> ogramming-guide.html#arbitrary-stateful-operations
>> There is option to do stateful operations, using GroupState where the
>> function gets iterator of events for that window. This is the closest
>> access
>> to StateStore a developer could get.
>> This arbitrary state that programmer could keep across invocations has its
>> limitations as such how much state we could keep?, is that state stored in
>> driver memory? What happens if the spark job fails is this checkpointed or
>> restored?
>>
>> thanks
>> Vijay
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


<    1   2   3   4   5   6   7   8   9   10   >