PySpark Structured Streaming - using previous iteration computed results in current iteration

2018-05-16 Thread Ofer Eliassaf
We would like to utilize maintaining an arbitrary state between invokations
of the iterations of StructuredStreaming in python

How can we maintain a static DataFrame that acts as state between the
iterations?

Several options that may be relevant:
1. in Spark memory (distributed across the workers)
2. External Memory solution (e.g. ElasticSearch / Redis)
3. utilizing other state maintenance that can work with PySpark

Specifically - given that in iteration N we get a Streaming DataFrame from
Kafka, we apply computation that produces a label column over the window of
samples from the last hour.
We want to keep around the labels and the sample ids for the next iteration
(N+1) where we want to do a join with the new sample window to inherit the
labels of samples that existed in the previous (N) iteration.


-- 
Regards,
Ofer Eliassaf


Re: Submit many spark applications

2018-05-16 Thread ayan guha
How about using Livy to submit jobs?

On Thu, 17 May 2018 at 7:24 am, Marcelo Vanzin  wrote:

> You can either:
>
> - set spark.yarn.submit.waitAppCompletion=false, which will make
> spark-submit go away once the app starts in cluster mode.
> - use the (new in 2.3) InProcessLauncher class + some custom Java code
> to submit all the apps from the same "launcher" process.
>
> On Wed, May 16, 2018 at 1:45 PM, Shiyuan  wrote:
> > Hi Spark-users,
> >  I want to submit as many spark applications as the resources permit. I
> am
> > using cluster mode on a yarn cluster.  Yarn can queue and launch these
> > applications without problems. The problem lies on spark-submit itself.
> > Spark-submit starts a jvm which could fail due to insufficient memory on
> the
> > machine where I run spark-submit if many spark-submit jvm are running.
> Any
> > suggestions on how to solve this problem? Thank you!
>
>
>
> --
> Marcelo
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
> --
Best Regards,
Ayan Guha


Re: Submit many spark applications

2018-05-16 Thread Marcelo Vanzin
You can either:

- set spark.yarn.submit.waitAppCompletion=false, which will make
spark-submit go away once the app starts in cluster mode.
- use the (new in 2.3) InProcessLauncher class + some custom Java code
to submit all the apps from the same "launcher" process.

On Wed, May 16, 2018 at 1:45 PM, Shiyuan  wrote:
> Hi Spark-users,
>  I want to submit as many spark applications as the resources permit. I am
> using cluster mode on a yarn cluster.  Yarn can queue and launch these
> applications without problems. The problem lies on spark-submit itself.
> Spark-submit starts a jvm which could fail due to insufficient memory on the
> machine where I run spark-submit if many spark-submit jvm are running. Any
> suggestions on how to solve this problem? Thank you!



-- 
Marcelo

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Submit many spark applications

2018-05-16 Thread Shiyuan
Hi Spark-users,
 I want to submit as many spark applications as the resources permit. I am
using cluster mode on a yarn cluster.  Yarn can queue and launch these
applications without problems. The problem lies on spark-submit itself.
Spark-submit starts a jvm which could fail due to insufficient memory on
the machine where I run spark-submit if many spark-submit jvm are running.
Any suggestions on how to solve this problem? Thank you!


Unsubscribe

2018-05-16 Thread varma dantuluri
-- 
Regards,

Varma Dantuluri


Re:

2018-05-16 Thread Nicolas Paris
Hi

I would go for a regular mysql bulkload. I m saying writing an output
that mysql is able to load in one process. I d'say spark jdbc is ok for
small fetch/load. When comes large RDBMS call, it turns out using the
regular optimized API is better than jdbc

2018-05-16 16:18 GMT+02:00 Vadim Semenov :

> Upon downsizing to 20 partitions some of your partitions become too big,
> and I see that you're doing caching, and executors try to write big
> partitions to disk, but fail because they exceed 2GiB
>
> > Caused by: java.lang.IllegalArgumentException: Size exceeds
> Integer.MAX_VALUE
> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869)
> at org.apache.spark.storage.DiskStore$$anonfun$getBytes$4.apply
> (DiskStore.scala:125)
> at org.apache.spark.storage.DiskStore$$anonfun$getBytes$4.apply
> (DiskStore.scala:124)
>
> You can try to coalesce to 100 and reduce the number of executors to keep
> the load on MySQL reasonable
>
> On Wed, May 16, 2018 at 5:36 AM, Davide Brambilla <
> davide.brambi...@contentwise.tv> wrote:
>
>> Hi all,
>>we have a dataframe with 1000 partitions and we need to write the
>> dataframe into a MySQL using this command:
>>
>> df.coalesce(20)
>> df.write.jdbc(url=url,
>>   table=table,
>>   mode=mode,
>>   properties=properties)
>>
>> and we get this errors randomly
>>
>> java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
>> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869)
>> at org.apache.spark.storage.DiskStore$$anonfun$getBytes$4.apply
>> (DiskStore.scala:125)
>> at org.apache.spark.storage.DiskStore$$anonfun$getBytes$4.apply
>> (DiskStore.scala:124)
>> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
>> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:126)
>> at org.apache.spark.storage.BlockManager.getLocalValues(BlockMa
>> nager.scala:520)
>> at org.apache.spark.storage.BlockManager.get(BlockManager.scala:693)
>> at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockM
>> anager.scala:753)
>> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
>> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsR
>> DD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsR
>> DD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsR
>> DD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsR
>> DD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsR
>> DD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsR
>> DD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap
>> Task.scala:96)
>> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap
>> Task.scala:53)
>> at org.apache.spark.scheduler.Task.run(Task.scala:108)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>> Executor.java:1149)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>> lExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>>
>> Driver stacktrace:
>> at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$sch
>> eduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1690)
>> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$
>> 1.apply(DAGScheduler.scala:1678)
>> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$
>> 1.apply(DAGScheduler.scala:1677)
>> at scala.collection.mutable.ResizableArray$class.foreach(Resiza
>> bleArray.scala:59)
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>> at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGSchedu
>> ler.scala:1677)
>> at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskS
>> etFailed$1.apply(DAGScheduler.scala:855)
>> at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskS
>> etFailed$1.apply(DAGScheduler.scala:855)
>> at scala.Option.foreach(Option.scala:257)
>> at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(
>> DAGScheduler.scala:855)
>> at 

Re: How to use StringIndexer for multiple input /output columns in Spark Java

2018-05-16 Thread Bryan Cutler
Yes, the workaround is to create multiple StringIndexers as you described.
OneHotEncoderEstimator is only in Spark 2.3.0, you will have to use just
OneHotEncoder.

On Tue, May 15, 2018, 8:40 AM Mina Aslani  wrote:

> Hi,
>
> So, what is the workaround? Should I create multiple indexer(one for each
> column), and then create pipeline and set stages to have all the
> StringIndexers?
> I am using 2.2.1 as I cannot move to 2.3.0. Looks like
> oneHotEncoderEstimator is broken, please see my email sent today with
> subject:
> OneHotEncoderEstimator - java.lang.NoSuchMethodError: org.apache.spark.sql
> .Dataset.withColumns
>
> Regards,
> Mina
>
> On Tue, May 15, 2018 at 2:37 AM, Nick Pentreath 
> wrote:
>
>> Multi column support for StringIndexer didn’t make it into Spark 2.3.0
>>
>> The PR is still in progress I think - should be available in 2.4.0
>>
>> On Mon, 14 May 2018 at 22:32, Mina Aslani  wrote:
>>
>>> Please take a look at the api doc:
>>> https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/ml/feature/StringIndexer.html
>>>
>>> On Mon, May 14, 2018 at 4:30 PM, Mina Aslani 
>>> wrote:
>>>
 Hi,

 There is no SetInputCols/SetOutputCols for StringIndexer in Spark java.
 How multiple input/output columns can be specified then?

 Regards,
 Mina

>>>
>>>
>


Structured Streaming Job stops abruptly and No errors logged

2018-05-16 Thread prudhviraj202m
Hello,

I have a structured streaming job that consumes messages from kafka and does
some stateful associations using flatMapGroupWithState. Every time I submit
the job, it runs fine for around 2hours and then stops abruptly without any
error messages. All I can see in the debug logs is the below message, which
seems to be the starting point that caused the app to shut down.

"INFO SparkContext: Invoking stop() from shutdown hook" 

Apart from this message I do not see any exceptions/errors which lead to the
spark context being stopped. Also, I have been monitoring the JMX metrics of
the app using graphite/prometheus exporter.  At any point of time, the
memory used never goes beyond the available memory.

Can anyone suggest what could be the issue here. 

Also, here is the log4j configuration I am using, does this capture all
types of errors or am I missing something.

log4j.rootCategory=DEBUG, file
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p
%c{1}: %m%n

log4j.appender.file=org.apache.log4j.RollingFileAppender
log4j.appender.file.File=/path/to/app.log
log4j.appender.serverAccess.DatePattern='.'-MM-dd
log4j.appender.file.MaxFileSize=10MB
log4j.appender.file.MaxBackupIndex=5
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p
%c{1}: %m%n

log4j.logger.org.apache.spark=DEBUG
log4j.logger.org.apache.spark.repl.Main=DEBUG
log4j.logger.org.spark_project.jetty=DEBUG
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=DEBUG
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=DEBUG
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=DEBUG
log4j.logger.org.apache.parquet=DEBUG
log4j.logger.parquet=DEBUG 

- Prudhvi





--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re:

2018-05-16 Thread Vadim Semenov
Upon downsizing to 20 partitions some of your partitions become too big,
and I see that you're doing caching, and executors try to write big
partitions to disk, but fail because they exceed 2GiB

> Caused by: java.lang.IllegalArgumentException: Size exceeds
Integer.MAX_VALUE
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869)
at org.apache.spark.storage.DiskStore$$anonfun$getBytes$4.
apply(DiskStore.scala:125)
at org.apache.spark.storage.DiskStore$$anonfun$getBytes$4.
apply(DiskStore.scala:124)

You can try to coalesce to 100 and reduce the number of executors to keep
the load on MySQL reasonable

On Wed, May 16, 2018 at 5:36 AM, Davide Brambilla <
davide.brambi...@contentwise.tv> wrote:

> Hi all,
>we have a dataframe with 1000 partitions and we need to write the
> dataframe into a MySQL using this command:
>
> df.coalesce(20)
> df.write.jdbc(url=url,
>   table=table,
>   mode=mode,
>   properties=properties)
>
> and we get this errors randomly
>
> java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869)
> at org.apache.spark.storage.DiskStore$$anonfun$getBytes$4.
> apply(DiskStore.scala:125)
> at org.apache.spark.storage.DiskStore$$anonfun$getBytes$4.
> apply(DiskStore.scala:124)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:126)
> at org.apache.spark.storage.BlockManager.getLocalValues(
> BlockManager.scala:520)
> at org.apache.spark.storage.BlockManager.get(BlockManager.scala:693)
> at org.apache.spark.storage.BlockManager.getOrElseUpdate(
> BlockManager.scala:753)
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:96)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:53)
> at org.apache.spark.scheduler.Task.run(Task.scala:108)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
>
> Driver stacktrace:
> at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$
> scheduler$DAGScheduler$$failJobAndIndependentStages(
> DAGScheduler.scala:1690)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(
> DAGScheduler.scala:1678)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(
> DAGScheduler.scala:1677)
> at scala.collection.mutable.ResizableArray$class.foreach(
> ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at org.apache.spark.scheduler.DAGScheduler.abortStage(
> DAGScheduler.scala:1677)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$
> handleTaskSetFailed$1.apply(DAGScheduler.scala:855)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$
> handleTaskSetFailed$1.apply(DAGScheduler.scala:855)
> at scala.Option.foreach(Option.scala:257)
> at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(
> DAGScheduler.scala:855)
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
> doOnReceive(DAGScheduler.scala:1905)
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
> onReceive(DAGScheduler.scala:1860)
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
> onReceive(DAGScheduler.scala:1849)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:671)
> at 

Datafarme save as table operation is failing when the child columns name contains special characters

2018-05-16 Thread abhijeet bedagkar
Hi,

I am using SPARK to read the XML / JSON files to create a dataframe and
save it as a hive table

Sample XML file:

101

45
COMMAND



Note field 'validation-timeout' under testexecutioncontroller.

Below is the schema populated by DF after reading the XML file

|-- id: long (nullable = true)
|-- testexecutioncontroller: struct (nullable = true)
||-- execution-timeout: long (nullable = true)
||-- execution-method: string (nullable = true)

While saving this dataframe to hive table I am getting below exception

Caused by: java.lang.IllegalArgumentException: Error: : expected at the
position 24 of
'bigint:struct' but '-'
is found.at
org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils$TypeInfoParser.expect(TypeInfoUtils.java:360)
  at
org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils$TypeInfoParser.expect(TypeInfoUtils.java:331)
  at
org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils$TypeInfoParser.parseType(TypeInfoUtils.java:483)
  at
org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils$TypeInfoParser.parseTypeInfos(TypeInfoUtils.java:305)
  at
org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils.getTypeInfosFromTypeString(TypeInfoUtils.java:765)
  at
org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe.initialize(ParquetHiveSerDe.java:111)
  at
org.apache.hadoop.hive.serde2.AbstractSerDe.initialize(AbstractSerDe.java:53)
  at
org.apache.hadoop.hive.serde2.SerDeUtils.initializeSerDe(SerDeUtils.java:521)
  at
org.apache.hadoop.hive.metastore.MetaStoreUtils.getDeserializer(MetaStoreUtils.java:391)
  at
org.apache.hadoop.hive.ql.metadata.Table.getDeserializerFromMetaStore(Table.java:276)
  at
org.apache.hadoop.hive.ql.metadata.Table.checkValidity(Table.java:197)
  at org.apache

It looks like the issue is happening due to special character '-' in the
field. As after removing the special character it iw working properly.

Could you please let me know if there is way to replaces all child column
names so that it can be saved as table without any issue.

Creating the STRUCT FIELD from df.schema and recursively creating another
STRUCTFIELD with renamed column is one solution I am aware of. But still
wanted to check if there is easy way to do this.

Thanks,
Abhijeet


Re: Structured Streaming, Reading and Updating a variable

2018-05-16 Thread Martin Engen
I have been testing some with aggregations, but I seem to hit a wall on two 
issues.
example:
val avg = areaStateDf.groupBy($"plantKey").avg("sensor")

1) How can I use the result from an aggr within the same stream, to do further 
calculations?
2) It seems to be very slow. If I want a moving window of 24 hours, and to have 
a moving average on some calculations within this. When testing locally with 
using

The Accumulator issue:
Simple Counter Accumulator:
object Test {
  private val spark = SparkHelper.getSparkSession()
  import spark.implicits._
  import com.datastax.spark.connector._
  val counter = spark.sparkContext.longAccumulator("counter")

  val fetchData = () => {
counter.add(2)
counter.value
  }

  val fetchdataUDF = spark.sqlContext.udf.register("testUDF", fetchData)

  def calculate(areaStateDf: DataFrame): StreamingQuery = {
import spark.implicits._
val ds = areaStateDf.select($"areaKey").withColumn("fetchedData", 
fetchdataUDF())
KafkaSinks.debugStream(ds, "volumTest")
  }
}
I would create a custom accumulator to include a smoothing algorithm, but cant 
seem to be able to get a normal counter working.
This works locally, but on the server running Docker (using a master and 1 
worker) throws this error:

18/05/16 08:35:22 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 3.0 
(TID 3, 172.17.0.5, executor 0): java.lang.ExceptionInInitializerError
at com.client.spark.calculations.Test$$anonfun$1.apply(ThpLoad1.scala:24)
at com.client.spark.calculations.Test$$anonfun$1.apply(ThpLoad1.scala:15)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
 Source)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:235)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: A master URL must be set in your 
configuration
at org.apache.spark.SparkContext.(SparkContext.scala:376)
at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2516)
at 
org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:918)
at 
org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:910)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:910)
at com.cambi.assurance.spark.SparkHelper$.getSparkSession(SparkHelper.scala:28)
at com.client.spark.calculations.Test$.(ThpLoad1.scala:10)
at com.client.spark.calculations.Test$.(ThpLoad1.scala)
... 18 more

18/05/16 08:35:22 WARN scheduler.TaskSetManager: Lost task 0.1 in stage 3.0 
(TID 4, 172.17.0.5, executor 0): java.lang.NoClassDefFoundError: Could not 
initialize class com.client.spark.calculations.Test$
at com.client.spark.calculations.Test$$anonfun$1.apply(ThpLoad1.scala:24)
at com.client.spark.calculations.Test$$anonfun$1.apply(ThpLoad1.scala:15)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
 Source)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:235)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at 

Re:

2018-05-16 Thread Davide Brambilla
Hi,
   we have 2 millions of rows using a cluster using an EMR cluster with 8
machines m4.4xlarge with 100GB EBS storage.


Davide B.

Davide Brambilla
ContentWise R
ContentWise
davide.brambi...@contentwise.tv
phone: +39 02 49517001 mobile: 345 71 13 800
ContentWise SrL - Via Schiaffino, 11 - 20158 Milano (MI) – ITALY

On Wed, May 16, 2018 at 11:41 AM, Jörn Franke  wrote:

> How many rows do you have in total?
>
> On 16. May 2018, at 11:36, Davide Brambilla  tv> wrote:
>
> Hi all,
>we have a dataframe with 1000 partitions and we need to write the
> dataframe into a MySQL using this command:
>
> df.coalesce(20)
> df.write.jdbc(url=url,
>   table=table,
>   mode=mode,
>   properties=properties)
>
> and we get this errors randomly
>
> java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869)
> at org.apache.spark.storage.DiskStore$$anonfun$getBytes$4.
> apply(DiskStore.scala:125)
> at org.apache.spark.storage.DiskStore$$anonfun$getBytes$4.
> apply(DiskStore.scala:124)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:126)
> at org.apache.spark.storage.BlockManager.getLocalValues(
> BlockManager.scala:520)
> at org.apache.spark.storage.BlockManager.get(BlockManager.scala:693)
> at org.apache.spark.storage.BlockManager.getOrElseUpdate(
> BlockManager.scala:753)
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:96)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:53)
> at org.apache.spark.scheduler.Task.run(Task.scala:108)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
>
> Driver stacktrace:
> at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$
> scheduler$DAGScheduler$$failJobAndIndependentStages(
> DAGScheduler.scala:1690)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(
> DAGScheduler.scala:1678)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(
> DAGScheduler.scala:1677)
> at scala.collection.mutable.ResizableArray$class.foreach(
> ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at org.apache.spark.scheduler.DAGScheduler.abortStage(
> DAGScheduler.scala:1677)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$
> handleTaskSetFailed$1.apply(DAGScheduler.scala:855)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$
> handleTaskSetFailed$1.apply(DAGScheduler.scala:855)
> at scala.Option.foreach(Option.scala:257)
> at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(
> DAGScheduler.scala:855)
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
> doOnReceive(DAGScheduler.scala:1905)
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
> onReceive(DAGScheduler.scala:1860)
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
> onReceive(DAGScheduler.scala:1849)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:671)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
> at 

OOM: Structured Streaming aggregation state not cleaned up propertly

2018-05-16 Thread weand
We implemented a streaming query with aggregation on event-time with
watermark. I'm wondering why aggregation state is not cleanup up. According
to documentation old aggregation state should be cleared when using
watermarks. We also don't see any condition [1] for why state should not be
cleanup up.

We do something like this:

event_schema = T.StructType([
T.StructField("remote_ip", T.StringType(), True),
T.StructField("username", T.StringType(), True)
])

stream_writer = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "input_topic")
.option("startingOffsets", "earliest")
.load()
.withColumn("key", F.col("key").cast(T.StringType()))
.withColumn("value", F.col("value").cast(T.StringType()))
.withColumn("event", F.from_json(F.col("value"), event_schema))
.select("timestamp", "event.*")
.where("username rlike '[^@]+@[^\.]\..+'")
*.withWatermark("timestamp", "600 seconds")
.groupBy(
F.window("timestamp", "600 seconds", "30 seconds"),
F.col("remote_ip")
)
.agg(
F.approx_count_distinct("username").alias("distinct_username"),
F.collect_set("username").alias("all_usernames"),
)*
.where(F.expr("distinct_username >= 2"))
.select("remote_ip")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "output_topic")
.option("checkpointLocation", "hdfs://...")
.trigger(processingTime="10 seconds")
.outputMode("update")
.start()
)

When running this for several hours we face an Heap OutOfMemory in our
driver application
 

Analyzing the heap dump reveals:
*One instance of "org.apache.spark.status.ElementTrackingStore"* loaded by
"sun.misc.Launcher$AppClassLoader @ 0x800223d0" occupies *1.793.945.416
(93,59%) bytes*. The memory is accumulated in one instance of
"java.util.concurrent.ConcurrentHashMap$Node[]" loaded by "".


I would expect that memory peaks would only appear for the window duration
size (which is 10 minutes in our case). But it seems event state is never
cleaned.

Any ideas?

Regards
Andreas

[1]
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#conditions-for-watermarking-to-clean-aggregation-state



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re:

2018-05-16 Thread Jörn Franke
How many rows do you have in total?

> On 16. May 2018, at 11:36, Davide Brambilla  
> wrote:
> 
> Hi all,
>we have a dataframe with 1000 partitions and we need to write the 
> dataframe into a MySQL using this command:
> 
> df.coalesce(20)
> df.write.jdbc(url=url,
>   table=table,
>   mode=mode,
>   properties=properties)
> 
> and we get this errors randomly
> 
> java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
>   at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869)
>   at 
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$4.apply(DiskStore.scala:125)
>   at 
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$4.apply(DiskStore.scala:124)
>   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
>   at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:126)
>   at 
> org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:520)
>   at org.apache.spark.storage.BlockManager.get(BlockManager.scala:693)
>   at 
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:753)
>   at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
>   at org.apache.spark.scheduler.Task.run(Task.scala:108)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> 
> Driver stacktrace:
>   at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1690)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1678)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1677)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1677)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:855)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:855)
>   at scala.Option.foreach(Option.scala:257)
>   at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:855)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1905)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1860)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1849)
>   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>   at 
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:671)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
>   at 

[no subject]

2018-05-16 Thread Davide Brambilla
Hi all,
   we have a dataframe with 1000 partitions and we need to write the
dataframe into a MySQL using this command:

df.coalesce(20)
df.write.jdbc(url=url,
  table=table,
  mode=mode,
  properties=properties)

and we get this errors randomly

java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869)
at
org.apache.spark.storage.DiskStore$$anonfun$getBytes$4.apply(DiskStore.scala:125)
at
org.apache.spark.storage.DiskStore$$anonfun$getBytes$4.apply(DiskStore.scala:124)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:126)
at
org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:520)
at org.apache.spark.storage.BlockManager.get(BlockManager.scala:693)
at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:753)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1690)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1678)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1677)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1677)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:855)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:855)
at scala.Option.foreach(Option.scala:257)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:855)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1905)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1860)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1849)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:671)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:446)
at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at 

Re: [Java] impact of java 10 on spark dev

2018-05-16 Thread Jörn Franke
First thing would be that scala supports them. Then for other things someone 
might need to redesign the Spark source code to leverage modules - this could 
be a rather handy feature to have a small but very well designed core (core, 
ml, graph etc) around which others write useful modules.

> On 16. May 2018, at 08:20, xmehaut  wrote:
> 
> Hello,
> 
> i would like to know what coudl be the impacts of java 10+ on spark. I know
> that spark is written in scala, but the last versions of java include many
> improvements, especially in the jvm or in the delivery process (modules,
> jit, memory mngt, ...) which could benefit to spark.
> 
> regards
> 
> 
> 
> 
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [structured-streaming][kafka] Will the Kafka readstream timeout after connections.max.idle.ms 540000 ms ?

2018-05-16 Thread Shixiong(Ryan) Zhu
The streaming query should keep polling data from Kafka. When the query was
stopped, did you see any exception?

Best Regards,
Shixiong Zhu
Databricks Inc.
shixi...@databricks.com

databricks.com

[image: http://databricks.com] 




On Tue, May 15, 2018 at 6:25 PM, karthikjay  wrote:

> Hi all,
>
> We are running into a scenario where the structured streaming job is
> exiting
> after a while specifically when the Kafka topic is not getting any data.
> From the job logs, I see this connections.max.idle.ms = 54. Does that
> mean the spark readstream will close when it does not get data for 54
> milliseconds ? If yes, how do I override this ?
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Continuous Processing mode behaves differently from Batch mode

2018-05-16 Thread Shixiong(Ryan) Zhu
One possible case is you don't have enough resources to launch all tasks
for your continuous processing query. Could you check the Spark UI and see
if all tasks are running rather than waiting for resources?

Best Regards,
Shixiong Zhu
Databricks Inc.
shixi...@databricks.com

databricks.com

[image: http://databricks.com] 




On Tue, May 15, 2018 at 5:38 PM, Yuta Morisawa  wrote:

> Hi all
>
> Now I am using Structured Streaming in Continuous Processing mode and I
> faced a odd problem.
>
> My code is so simple that it is similar to the sample code on the
> documentation.
> https://spark.apache.org/docs/latest/structured-streaming-pr
> ogramming-guide.html#continuous-processing
>
>
> When I send the same text data ten times, for example 10 lines text, in
> Batch mode the result has 100 lines.
>
> But in Continuous Processing mode the result has only 10 lines.
> It appears duplicated lines are removed.
>
> The difference of these two codes is only with or without trigger method.
>
> Why these two code behave differently ?
>
>
> --
> Regard,
> Yuta
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


[Java] impact of java 10 on spark dev

2018-05-16 Thread xmehaut
Hello,

i would like to know what coudl be the impacts of java 10+ on spark. I know
that spark is written in scala, but the last versions of java include many
improvements, especially in the jvm or in the delivery process (modules,
jit, memory mngt, ...) which could benefit to spark.

regards




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org