Re: [SS] Console sink not supporting recovering from checkpoint location? Why?

2017-08-08 Thread Jacek Laskowski
Hi Michael,

That reflects my sentiments so well. Thanks for having confirmed my thoughts!

https://issues.apache.org/jira/browse/SPARK-21667

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Tue, Aug 8, 2017 at 12:37 AM, Michael Armbrust
 wrote:
> I think there is really no good reason for this limitation.
>
> On Mon, Aug 7, 2017 at 2:58 AM, Jacek Laskowski  wrote:
>>
>> Hi,
>>
>> While exploring checkpointing with kafka source and console sink I've
>> got the exception:
>>
>> // today's build from the master
>> scala> spark.version
>> res8: String = 2.3.0-SNAPSHOT
>>
>> scala> val q = records.
>>  |   writeStream.
>>  |   format("console").
>>  |   option("truncate", false).
>>  |   option("checkpointLocation", "/tmp/checkpoint"). // <--
>> checkpoint directory
>>  |   trigger(Trigger.ProcessingTime(10.seconds)).
>>  |   outputMode(OutputMode.Update).
>>  |   start
>> org.apache.spark.sql.AnalysisException: This query does not support
>> recovering from checkpoint location. Delete /tmp/checkpoint/offsets to
>> start over.;
>>   at
>> org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:222)
>>   at
>> org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278)
>>   at
>> org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:284)
>>   ... 61 elided
>>
>> The "trigger" is the change
>> https://issues.apache.org/jira/browse/SPARK-16116 and this line in
>> particular
>> https://github.com/apache/spark/pull/13817/files#diff-d35e8fce09686073f81de598ed657de7R277.
>>
>> Why is this needed? I can't think of a use case where console sink
>> could not recover from checkpoint location (since all the information
>> is available). I'm lost on it and would appreciate some help (to
>> recover :))
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://medium.com/@jaceklaskowski/
>> Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
>> Follow me at https://twitter.com/jaceklaskowski
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re:

2017-08-08 Thread Ramesh Krishnan
unsubscribe

On Mon, Aug 7, 2017 at 2:57 PM, Sumit Saraswat 
wrote:

> Unsubscribe
>


Spark Streaming job statistics

2017-08-08 Thread KhajaAsmath Mohammed
Hi,

I am running spark streaming job which receives data from azure iot hub. I
am not sure if the connection was successful and receving any data. does
the input column show how much data it has read if the connection was
successful?

[image: Inline image 1]


IndexOutOfBoundException in catalyst when doing multiple approxDistinctCount

2017-08-08 Thread AssafMendelson
Hi,

I am doing a large number of aggregations on a dataframe (without groupBy) to 
get some statistics. As part of this I am doing an approx_count_distinct(c, 
0.01)
Everything works fine but when I do the same aggregation a second time (for 
each column) I get the following error:



[Stage 2:>  (0 + 2) / 
2][WARN] org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator: Error 
calculating stats of compiled class.
java.lang.IndexOutOfBoundsException: Index: 4355, Size: 1
at java.util.ArrayList.rangeCheck(ArrayList.java:653)
at java.util.ArrayList.get(ArrayList.java:429)
at 
org.codehaus.janino.util.ClassFile.getConstantPoolInfo(ClassFile.java:556)
at 
org.codehaus.janino.util.ClassFile.getConstantUtf8(ClassFile.java:572)
at 
org.codehaus.janino.util.ClassFile.loadAttribute(ClassFile.java:1513)
at 
org.codehaus.janino.util.ClassFile.loadAttributes(ClassFile.java:644)
at 
org.codehaus.janino.util.ClassFile.loadFields(ClassFile.java:623)
at org.codehaus.janino.util.ClassFile.(ClassFile.java:280)
at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anonfun$recordCompilationStats$1.apply(CodeGenerator.scala:996)
at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anonfun$recordCompilationStats$1.apply(CodeGenerator.scala:993)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at 
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.recordCompilationStats(CodeGenerator.scala:993)
at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:961)
at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1027)
at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1024)
at 
org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
at 
org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
at 
org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
at 
org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
at 
org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000)
at 
org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004)
at 
org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:906)
at 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:412)
at 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:366)
at 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:32)
at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:890)
at 
org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:130)
at 
org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:140)
at 
org.apache.spark.sql.execution.aggregate.AggregationIterator.generateResultProjection(AggregationIterator.scala:235)
at 
org.apache.spark.sql.execution.aggregate.AggregationIterator.(AggregationIterator.scala:266)
at 
org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.(SortBasedAggregationIterator.scala:39)
at 
org.apache.spark.sql.execution.aggregate.SortAggregateExec$$anonfun$doExecute$1$$anonfun$3.apply(SortAggregateExec.scala:86)
at 
org.apache.spark.sql.execution.aggregate.SortAggregateExec$$anonfun$doExecute$1$$anonfun$3.apply(SortAggregateExec.scala:77)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at 
org.apache.spark

unsubscribe

2017-08-08 Thread Ramesh Krishnan
unsubscribe


Question about 'Structured Streaming'

2017-08-08 Thread Brian Wylie
Hi All,

I've read the new information about Structured Streaming in Spark, looks
super great.

Resources that I've looked at
- https://spark.apache.org/docs/latest/streaming-programming-guide.html
-
https://databricks.com/blog/2016/07/28/structured-streaming-in-apache-spark.html
- https://spark.apache.org/docs/latest/streaming-custom-receivers.html
-
http://cdn2.hubspot.net/hubfs/438089/notebooks/spark2.0/Structured%20Streaming%20using%20Python%20DataFrames%20API.html

+ YouTube videos from Spark Summit 2016/2017

So finally getting to my question:

I have Python code that yields a Python generator... this is a great
streaming approach within Python. I've used it for network packet
processing and a bunch of other stuff. I'd love to simply hook up this
generator (that yields python dictionaries) along with a schema definition
to create an  'unbounded DataFrame' as discussed in
https://databricks.com/blog/2016/07/28/structured-streaming-in-apache-spark.html

Possible approaches:
- Make a custom receiver in Python:
https://spark.apache.org/docs/latest/streaming-custom-receivers.html
- Use Kafka (this is definitely possible and good but overkill for my use
case)
- Send data out a socket and use socketTextStream to pull back in (seems a
bit silly to me)
- Other???

Since Python Generators so naturally fit into streaming pipelines I'd think
that this would be straightforward to 'couple' a python generator into a
Spark structured streaming pipeline..

I've put together a small notebook just to give a concrete example
(streaming Bro IDS network data)
https://github.com/Kitware/BroThon/blob/master/notebooks/Bro_IDS_to_Spark.ipynb

Any thoughts/suggestions/pointers are greatly appreciated.

-Brian


Unsubscribe

2017-08-08 Thread Sumit Saraswat
Unsubscribe


speculative execution in spark

2017-08-08 Thread john_test_test
Is it possible by anyhow to take advantage of the already processed portion
of the failed task so I can use the speculative execution to reassign only
the what is left from the original task to another node? if yes then how can
I read it from memroy?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/speculative-execution-in-spark-tp29042.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Fwd: Python question about 'Structured Streaming'

2017-08-08 Thread Brian Wylie
Hi All,

I've read the new information about Structured Streaming in Spark, looks
super great.

Resources that I've looked at
- https://spark.apache.org/docs/latest/streaming-programming-guide.html
- https://databricks.com/blog/2016/07/28/structured-streamin
g-in-apache-spark.html
- https://spark.apache.org/docs/latest/streaming-custom-receivers.html
- http://cdn2.hubspot.net/hubfs/438089/notebooks/spark2.
0/Structured%20Streaming%20using%20Python%20DataFrames%20API.html

+ YouTube videos from Spark Summit 2016/2017

So finally getting to my question:

I have Python code that yields a Python generator... this is a great
streaming approach within Python. I've used it for network packet
processing and a bunch of other stuff. I'd love to simply hook up this
generator (that yields python dictionaries) along with a schema definition
to create an  'unbounded DataFrame' as discussed in https://databricks.com/
blog/2016/07/28/structured-streaming-in-apache-spark.html

Possible approaches:
- Make a custom receiver in Python: https://spark.apache.o
rg/docs/latest/streaming-custom-receivers.html
- Use Kafka (this is definitely possible and good but overkill for my use
case)
- Send data out a socket and use socketTextStream to pull back in (seems a
bit silly to me)
- Other???

Since Python Generators so naturally fit into streaming pipelines I'd think
that this would be straightforward to 'couple' a python generator into a
Spark structured streaming pipeline..

I've put together a small notebook just to give a concrete example
(streaming Bro IDS network data) https://github.com/Kitwa
re/BroThon/blob/master/notebooks/Bro_IDS_to_Spark.ipynb

Any thoughts/suggestions/pointers are greatly appreciated.

-Brian


count exceed int.MaxValue

2017-08-08 Thread makoto
Hello,
I'd like to count more than Int.MaxValue. But I encountered the following
error.

scala> val rdd = sc.parallelize(1L to Int.MaxValue*2.toLong)
rdd: org.apache.spark.rdd.RDD[Long] = ParallelCollectionRDD[28] at
parallelize at :24

scala> rdd.count
java.lang.IllegalArgumentException: More than Int.MaxValue elements.
  at
scala.collection.immutable.NumericRange$.check$1(NumericRange.scala:304)
  at scala.collection.immutable.NumericRange$.count(NumericRange.scala:314)
  at
scala.collection.immutable.NumericRange.numRangeElements$lzycompute(NumericRange.scala:52)
  at
scala.collection.immutable.NumericRange.numRangeElements(NumericRange.scala:51)
  at scala.collection.immutable.NumericRange.length(NumericRange.scala:54)
  at
org.apache.spark.rdd.ParallelCollectionRDD$.slice(ParallelCollectionRDD.scala:145)
  at
org.apache.spark.rdd.ParallelCollectionRDD.getPartitions(ParallelCollectionRDD.scala:97)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
  at org.apache.spark.rdd.RDD.count(RDD.scala:1158)
  ... 48 elided

How can I avoid the error ?
A similar problem is as follows:
scala> rdd.reduce((a,b)=> (a + b))
java.lang.IllegalArgumentException: More than Int.MaxValue elements.
  at
scala.collection.immutable.NumericRange$.check$1(NumericRange.scala:304)
  at scala.collection.immutable.NumericRange$.count(NumericRange.scala:314)
  at
scala.collection.immutable.NumericRange.numRangeElements$lzycompute(NumericRange.scala:52)
  at
scala.collection.immutable.NumericRange.numRangeElements(NumericRange.scala:51)
  at scala.collection.immutable.NumericRange.length(NumericRange.scala:54)
  at
org.apache.spark.rdd.ParallelCollectionRDD$.slice(ParallelCollectionRDD.scala:145)
  at
org.apache.spark.rdd.ParallelCollectionRDD.getPartitions(ParallelCollectionRDD.scala:97)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2119)
  at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1026)
  at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
  at org.apache.spark.rdd.RDD.reduce(RDD.scala:1008)
  ... 48 elided


Re: count exceed int.MaxValue

2017-08-08 Thread Vadim Semenov
Scala doesn't support ranges >= Int.MaxValue
https://github.com/scala/scala/blob/2.12.x/src/library/scala/collection/immutable/Range.scala?utf8=✓#L89

You can create two RDDs and unionize them:

scala> val rdd = sc.parallelize(1L to
Int.MaxValue.toLong).union(sc.parallelize(1L to Int.MaxValue.toLong))
rdd: org.apache.spark.rdd.RDD[Long] = UnionRDD[10] at union at :24

scala> rdd.count
[Stage 0:>  (0 + 4)
/ 8]


Also instead of creating the range on the driver, you can create your RDD
in parallel:

scala> :paste
// Entering paste mode (ctrl-D to finish)

val numberOfParts = 100
val numberOfElementsInEachPart = Int.MaxValue.toDouble / 100

val rdd = sc.parallelize(1 to numberOfParts).flatMap(partNum => {
  val begin = ((partNum - 1) * numberOfElementsInEachPart).toLong
  val end = (partNum * numberOfElementsInEachPart).toLong
  begin to end
})

// Exiting paste mode, now interpreting.

numberOfParts: Int = 100
numberOfElementsInEachPart: Double = 2.147483647E7
rdd: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[15] at flatMap at
:31

scala> rdd.count
res10: Long = 2147483747

On Tue, Aug 8, 2017 at 1:26 PM, makoto  wrote:

> Hello,
> I'd like to count more than Int.MaxValue. But I encountered the following
> error.
>
> scala> val rdd = sc.parallelize(1L to Int.MaxValue*2.toLong)
> rdd: org.apache.spark.rdd.RDD[Long] = ParallelCollectionRDD[28] at
> parallelize at :24
>
> scala> rdd.count
> java.lang.IllegalArgumentException: More than Int.MaxValue elements.
>   at scala.collection.immutable.NumericRange$.check$1(
> NumericRange.scala:304)
>   at scala.collection.immutable.NumericRange$.count(
> NumericRange.scala:314)
>   at scala.collection.immutable.NumericRange.numRangeElements$
> lzycompute(NumericRange.scala:52)
>   at scala.collection.immutable.NumericRange.numRangeElements(
> NumericRange.scala:51)
>   at scala.collection.immutable.NumericRange.length(NumericRange.scala:54)
>   at org.apache.spark.rdd.ParallelCollectionRDD$.slice(
> ParallelCollectionRDD.scala:145)
>   at org.apache.spark.rdd.ParallelCollectionRDD.getPartitions(
> ParallelCollectionRDD.scala:97)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
>   at scala.Option.getOrElse(Option.scala:121)
>   at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
>   at org.apache.spark.rdd.RDD.count(RDD.scala:1158)
>   ... 48 elided
>
> How can I avoid the error ?
> A similar problem is as follows:
> scala> rdd.reduce((a,b)=> (a + b))
> java.lang.IllegalArgumentException: More than Int.MaxValue elements.
>   at scala.collection.immutable.NumericRange$.check$1(
> NumericRange.scala:304)
>   at scala.collection.immutable.NumericRange$.count(
> NumericRange.scala:314)
>   at scala.collection.immutable.NumericRange.numRangeElements$
> lzycompute(NumericRange.scala:52)
>   at scala.collection.immutable.NumericRange.numRangeElements(
> NumericRange.scala:51)
>   at scala.collection.immutable.NumericRange.length(NumericRange.scala:54)
>   at org.apache.spark.rdd.ParallelCollectionRDD$.slice(
> ParallelCollectionRDD.scala:145)
>   at org.apache.spark.rdd.ParallelCollectionRDD.getPartitions(
> ParallelCollectionRDD.scala:97)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
>   at scala.Option.getOrElse(Option.scala:121)
>   at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2119)
>   at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1026)
>   at org.apache.spark.rdd.RDDOperationScope$.withScope(
> RDDOperationScope.scala:151)
>   at org.apache.spark.rdd.RDDOperationScope$.withScope(
> RDDOperationScope.scala:112)
>   at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
>   at org.apache.spark.rdd.RDD.reduce(RDD.scala:1008)
>   ... 48 elided
>
>
>


Re: Spark Streaming job statistics

2017-08-08 Thread Riccardo Ferrari
Hi,

Have you tried to check the "Streaming" tab menu?

Best,

On Tue, Aug 8, 2017 at 4:15 PM, KhajaAsmath Mohammed <
mdkhajaasm...@gmail.com> wrote:

> Hi,
>
> I am running spark streaming job which receives data from azure iot hub. I
> am not sure if the connection was successful and receving any data. does
> the input column show how much data it has read if the connection was
> successful?
>
> [image: Inline image 1]
>


Re: Spark Streaming job statistics

2017-08-08 Thread KhajaAsmath Mohammed
No. will take a look now.

On Tue, Aug 8, 2017 at 1:47 PM, Riccardo Ferrari  wrote:

> Hi,
>
> Have you tried to check the "Streaming" tab menu?
>
> Best,
>
> On Tue, Aug 8, 2017 at 4:15 PM, KhajaAsmath Mohammed <
> mdkhajaasm...@gmail.com> wrote:
>
>> Hi,
>>
>> I am running spark streaming job which receives data from azure iot hub.
>> I am not sure if the connection was successful and receving any data. does
>> the input column show how much data it has read if the connection was
>> successful?
>>
>> [image: Inline image 1]
>>
>
>


Unsubscribe

2017-08-08 Thread Benjamin Soemartopo



From: john_test_test 
Sent: Wednesday, August 9, 2017 3:09:44 AM
To: user@spark.apache.org
Subject: speculative execution in spark

Is it possible by anyhow to take advantage of the already processed portion
of the failed task so I can use the speculative execution to reassign only
the what is left from the original task to another node? if yes then how can
I read it from memroy?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/speculative-execution-in-spark-tp29042.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Question about 'Structured Streaming'

2017-08-08 Thread Michael Armbrust
Cool stuff! A pattern I have seen is to use our CSV/TSV or JSON support to
read bro logs, rather than a python library.  This is likely to have much
better performance since we can do all of the parsing on the JVM without
having to flow it though an external python process.

On Tue, Aug 8, 2017 at 9:35 AM, Brian Wylie  wrote:

> Hi All,
>
> I've read the new information about Structured Streaming in Spark, looks
> super great.
>
> Resources that I've looked at
> - https://spark.apache.org/docs/latest/streaming-programming-guide.html
> - https://databricks.com/blog/2016/07/28/structured-
> streaming-in-apache-spark.html
> - https://spark.apache.org/docs/latest/streaming-custom-receivers.html
> - http://cdn2.hubspot.net/hubfs/438089/notebooks/spark2.0/
> Structured%20Streaming%20using%20Python%20DataFrames%20API.html
>
> + YouTube videos from Spark Summit 2016/2017
>
> So finally getting to my question:
>
> I have Python code that yields a Python generator... this is a great
> streaming approach within Python. I've used it for network packet
> processing and a bunch of other stuff. I'd love to simply hook up this
> generator (that yields python dictionaries) along with a schema definition
> to create an  'unbounded DataFrame' as discussed in
> https://databricks.com/blog/2016/07/28/structured-
> streaming-in-apache-spark.html
>
> Possible approaches:
> - Make a custom receiver in Python: https://spark.apache.
> org/docs/latest/streaming-custom-receivers.html
> - Use Kafka (this is definitely possible and good but overkill for my use
> case)
> - Send data out a socket and use socketTextStream to pull back in (seems a
> bit silly to me)
> - Other???
>
> Since Python Generators so naturally fit into streaming pipelines I'd
> think that this would be straightforward to 'couple' a python generator
> into a Spark structured streaming pipeline..
>
> I've put together a small notebook just to give a concrete example
> (streaming Bro IDS network data) https://github.com/
> Kitware/BroThon/blob/master/notebooks/Bro_IDS_to_Spark.ipynb
>
> Any thoughts/suggestions/pointers are greatly appreciated.
>
> -Brian
>
>


[Spark Structured Streaming]: truncated Parquet after driver crash or kill

2017-08-08 Thread dcam
Hello list


We have a Spark application that performs a set of ETLs: reading messages
from a Kafka topic, categorizing them, and writing the contents out as
Parquet files on HDFS. After writing, we are querying the data from HDFS
using Presto's hive integration. We are having problems because the Parquet
files are frequently truncated after the Spark driver is killed or crashes.

The meat of the (Scala) Spark jobs look like this:
Spark
  .openSession()
  .initKafkaStream("our_topic")
  .filter(...)
  .map(...)
  .coalesce(1)
  .writeStream
  .trigger(ProcessingTime("1 hours"))
  .outputMode("append")
  .queryName("MyETL")
  .format("parquet")
  .option("path", path)
  .start()

Is it expected that Parquet files could be truncated during crashes? 

Sometimes the files are only 4 bytes long, sometimes they are longer but
still too short to be valid Parquet files. Presto detects the short files
and refuses to query the entire table. I hoped the write out of the files
would be transactional, so that incomplete files would not be output.

We can fix crashes as they come up, but we will always need to kill the job
periodically to deploy new versions of the code. We want to run the
application as a long lived process that is continually reading from the
Kafka queue and writing out to HDFS for archival purposes.


Thanks,
Dave Cameron




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Structured-Streaming-truncated-Parquet-after-driver-crash-or-kill-tp29043.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Question about 'Structured Streaming'

2017-08-08 Thread Brian Wylie
I can see your point that you don't really want an external process being
used for the streaming data sourceOkay so on the CSV/TSV front, I have
two follow up questions:

1) Parsing data/Schema creation: The Bro IDS logs have a 8 line header that
contains the 'schema' for the data, each log http/dns/etc will have
different columns with different data types. So would I create a specific
CSV reader inherited from the general one?  Also I'm assuming this would
need to be in Scala/Java? (I suck at both of those :)

2) Dynamic Tailing: Does the CSV/TSV data sources support dynamic tailing
and handle log rotations?

Thanks and BTW your Spark Summit talks are really well done and
informative. You're an excellent speaker.

-Brian

On Tue, Aug 8, 2017 at 2:09 PM, Michael Armbrust 
wrote:

> Cool stuff! A pattern I have seen is to use our CSV/TSV or JSON support to
> read bro logs, rather than a python library.  This is likely to have much
> better performance since we can do all of the parsing on the JVM without
> having to flow it though an external python process.
>
> On Tue, Aug 8, 2017 at 9:35 AM, Brian Wylie 
> wrote:
>
>> Hi All,
>>
>> I've read the new information about Structured Streaming in Spark, looks
>> super great.
>>
>> Resources that I've looked at
>> - https://spark.apache.org/docs/latest/streaming-programming-guide.html
>> - https://databricks.com/blog/2016/07/28/structured-streaming-
>> in-apache-spark.html
>> - https://spark.apache.org/docs/latest/streaming-custom-receivers.html
>> - http://cdn2.hubspot.net/hubfs/438089/notebooks/spark2.0/Stru
>> ctured%20Streaming%20using%20Python%20DataFrames%20API.html
>>
>> + YouTube videos from Spark Summit 2016/2017
>>
>> So finally getting to my question:
>>
>> I have Python code that yields a Python generator... this is a great
>> streaming approach within Python. I've used it for network packet
>> processing and a bunch of other stuff. I'd love to simply hook up this
>> generator (that yields python dictionaries) along with a schema definition
>> to create an  'unbounded DataFrame' as discussed in
>> https://databricks.com/blog/2016/07/28/structured-streaming-
>> in-apache-spark.html
>>
>> Possible approaches:
>> - Make a custom receiver in Python: https://spark.apache.o
>> rg/docs/latest/streaming-custom-receivers.html
>> - Use Kafka (this is definitely possible and good but overkill for my use
>> case)
>> - Send data out a socket and use socketTextStream to pull back in (seems
>> a bit silly to me)
>> - Other???
>>
>> Since Python Generators so naturally fit into streaming pipelines I'd
>> think that this would be straightforward to 'couple' a python generator
>> into a Spark structured streaming pipeline..
>>
>> I've put together a small notebook just to give a concrete example
>> (streaming Bro IDS network data) https://github.com/Kitwa
>> re/BroThon/blob/master/notebooks/Bro_IDS_to_Spark.ipynb
>>
>> Any thoughts/suggestions/pointers are greatly appreciated.
>>
>> -Brian
>>
>>
>


Re: Spark streaming: java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration ... on restart from checkpoint

2017-08-08 Thread dcam
Considering the @transient annotations and the work done in the instance
initializer, not much state is really be broadcast to the executors. It
might be simpler to just create these instances on the executors, rather
than trying to broadcast them?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-java-lang-ClassCastException-org-apache-spark-util-SerializableConfiguration-on-restt-tp25698p29044.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



How to get the lag of a column in a spark streaming dataframe?

2017-08-08 Thread Prashanth Kumar Murali
I have data streaming into my spark scala application in this format

idmark1 mark2 mark3 time
uuid1 100   200   300   Tue Aug  8 14:06:02 PDT 2017
uuid1 100   200   300   Tue Aug  8 14:06:22 PDT 2017
uuid2 150   250   350   Tue Aug  8 14:06:32 PDT 2017
uuid2 150   250   350   Tue Aug  8 14:06:52 PDT 2017
uuid2 150   250   350   Tue Aug  8 14:06:58 PDT 2017

I have it read into columns id, mark1, mark2, mark3 and time. The time is
converted to datetime format as well. I want to get this grouped by id and
get the lag for mark1 which gives the previous row's mark1 value. Something
like this:

idmark1 mark2 mark3 prev_mark time
uuid1 100   200   300   null  Tue Aug  8 14:06:02 PDT 2017
uuid1 100   200   300   100   Tue Aug  8 14:06:22 PDT 2017
uuid2 150   250   350   null  Tue Aug  8 14:06:32 PDT 2017
uuid2 150   250   350   150   Tue Aug  8 14:06:52 PDT 2017
uuid2 150   250   350   150   Tue Aug  8 14:06:58 PDT 2017

Consider the dataframe to be markDF. I have tried:

val window = Window.partitionBy("uuid").orderBy("timestamp") val newerDF =
newDF.withColumn("prev_mark", lag("mark1", 1, null).over(window))

which says non time windows cannot be applied on streaming/appending
datasets/frames.

I have also tried:

val window =
Window.partitionBy("uuid").orderBy("timestamp").rowsBetween(-10, 10) val
newerDF = newDF.withColumn("prev_mark", lag("mark1", 1, null).over(window))

To get a window for few rows which did not work either. The streaming
window something like: window("timestamp", "10 minutes") cannot be used to
send over the lag. I am super confused on how to do this. Any help would be
awesome!!


StructuredStreaming: java.util.concurrent.TimeoutException: Cannot fetch record for offset

2017-08-08 Thread aravias
Hi,
we have a structured streaming app consuming data from kafka and writing to
s3.
I keep getting this timeout exception whenever the executor is specified and
running with more than one core per executor. If someone can share any info
related to this  if you know it would be great.



17/08/08 21:58:28 WARN TaskSetManager: Lost task 2.1 in stage 0.0 (TID 21,
ip-10-120-1-44.ec2.internal, executor 6):
java.util.concurrent.TimeoutException: Cannot fetch record for offset
362976936 in 12 milliseconds
at
org.apache.spark.sql.kafka010.CachedKafkaConsumer.fetchData(CachedKafkaConsumer.scala:208)
at
org.apache.spark.sql.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:106)
at
org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:158)
at
org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:149)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at
scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:30)
at
com.homeaway.omnihub.task.SparkStreamingTask.lambda$transform$349866b9$1(SparkStreamingTask.java:113)
at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:2245)
at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:2245)
at
org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(objects.scala:186)
at
org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(objects.scala:183)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/StructuredStreaming-java-util-concurrent-TimeoutException-Cannot-fetch-record-for-offset-tp29045.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Question about 'Structured Streaming'

2017-08-08 Thread Michael Armbrust
>
> 1) Parsing data/Schema creation: The Bro IDS logs have a 8 line header
> that contains the 'schema' for the data, each log http/dns/etc will have
> different columns with different data types. So would I create a specific
> CSV reader inherited from the general one?  Also I'm assuming this would
> need to be in Scala/Java? (I suck at both of those :)
>

This is a good question. What I have seen others do is actually run
different streams for the different log types.  This way you can customize
the schema to the specific log type.

Even without using Scala/Java you could also use the text data source
(assuming the logs are new line delimited) and then write the parser for
each line in python.  There will be a performance penalty here though.


> 2) Dynamic Tailing: Does the CSV/TSV data sources support dynamic tailing
> and handle log rotations?
>

The file based sources work by tracking which files have been processed and
then scanning (optionally using glob patterns) for new files.  There a two
assumptions here: files are immutable when they arrive and files always
have a unique name. If files are deleted, we ignore that, so you are okay
to rotate them out.

The full pipeline that I have seen often involves the logs getting uploaded
to something like S3.  This is nice because you get atomic visibility of
files that have already been rotated.  So I wouldn't really call this
dynamically tailing, but we do support looking for new files at some
location.


Reusing dataframes for streaming (spark 1.6)

2017-08-08 Thread Ashwin Raju
Hi,

We've built a batch application on Spark 1.6.1. I'm looking into how to run
the same code as a streaming (DStream based) application. This is using
pyspark.

In the batch application, we have a sequence of transforms that read from
file, do dataframe operations, then write to file. I was hoping to swap out
the read from file with textFileStream, then use the dataframe operations
as is. This would mean that if we change the batch pipeline, so long as it
is a sequence of dataframe operations, the streaming version can just reuse
the code.

Looking at the sql_network_wordcount

example, it looks like I'd have to do DStream.foreachRDD, convert the
passed in RDD into a dataframe and then do my sequence of dataframe
operations. However, that list of dataframe operations looks to be
hardcoded into the process method, is there any way to pass in a function
that takes a dataframe as input and returns a dataframe?

what i see from the example:

words.foreachRDD(process)

def process(time, rdd):
# create dataframe from RDD
# hardcoded operations on the dataframe

what i would like to do instead:
def process(time, rdd):
# create dataframe from RDD - input_df
# output_df = dataframe_pipeline_fn(input_df)

-ashwin


Unsubscribe

2017-08-08 Thread Chandrashekhar Vaidya
 

 

From: Sumit Saraswat [mailto:sumitxapa...@gmail.com] 
Sent: Tuesday, August 8, 2017 10:33 PM
To: user@spark.apache.org
Subject: Unsubscribe

 

Unsubscribe


Multiple queries on same stream

2017-08-08 Thread Raghavendra Pandey
I am using structured streaming to evaluate multiple rules on same running
stream.
I have two options to do that. One is to use forEach and evaluate all the
rules on the row..
The other option is to express rules in spark sql dsl and run multiple
queries.
I was wondering if option 1 will result in better performance even though I
can get catalyst optimization in option 2.

Thanks
Raghav


Re: Multiple queries on same stream

2017-08-08 Thread Jörn Franke
This is not easy to say without testing. It depends on type of computation etc. 
it also depends on the Spark version. Generally vectorization / SIMD could be 
much faster if it is applied by Spark / the JVM in scenario 2.

> On 9. Aug 2017, at 07:05, Raghavendra Pandey  
> wrote:
> 
> I am using structured streaming to evaluate multiple rules on same running 
> stream. 
> I have two options to do that. One is to use forEach and evaluate all the 
> rules on the row.. 
> The other option is to express rules in spark sql dsl and run multiple 
> queries. 
> I was wondering if option 1 will result in better performance even though I 
> can get catalyst optimization in option 2.
> 
> Thanks 
> Raghav 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org