Re: Error while doing stream-stream inner join (java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access)

2018-07-03 Thread kant kodali
I am currently using Spark 2.3.0. Will try it with 2.3.1

On Tue, Jul 3, 2018 at 3:12 PM, Shixiong(Ryan) Zhu 
wrote:

> Which version are you using? There is a known issue regarding this and
> should be fixed in 2.3.1. See https://issues.apache.org/
> jira/browse/SPARK-23623 for details.
>
> Best Regards,
> Ryan
>
> On Mon, Jul 2, 2018 at 3:56 AM, kant kodali  wrote:
>
>> Hi All,
>>
>> I get the below error quite often when I do an stream-stream inner join
>> on two data frames. After running through several experiments stream-stream
>> joins dont look stable enough for production yet. any advice on this?
>>
>> Thanks!
>>
>> java.util.ConcurrentModificationException: KafkaConsumer is not safe for
>> multi-threaded access
>> 18/07/02 09:32:14 INFO LineBufferedStream: stdout: at
>> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(
>> KafkaConsumer.java:1431)
>> 18/07/02 09:32:14 INFO LineBufferedStream: stdout: at
>> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaC
>> onsumer.java:1361)
>> 18/07/02 09:32:14 INFO LineBufferedStream: stdout: at
>> org.apache.spark.sql.kafka010.CachedKafkaConsumer.close(Cach
>> edKafkaConsumer.scala:301)
>>
>
>


Re: Error while doing stream-stream inner join (java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access)

2018-07-03 Thread Shixiong(Ryan) Zhu
Which version are you using? There is a known issue regarding this and
should be fixed in 2.3.1. See
https://issues.apache.org/jira/browse/SPARK-23623 for details.

Best Regards,
Ryan

On Mon, Jul 2, 2018 at 3:56 AM, kant kodali  wrote:

> Hi All,
>
> I get the below error quite often when I do an stream-stream inner join on
> two data frames. After running through several experiments stream-stream
> joins dont look stable enough for production yet. any advice on this?
>
> Thanks!
>
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for
> multi-threaded access
> 18/07/02 09:32:14 INFO LineBufferedStream: stdout: at
> org.apache.kafka.clients.consumer.KafkaConsumer.
> acquire(KafkaConsumer.java:1431)
> 18/07/02 09:32:14 INFO LineBufferedStream: stdout: at
> org.apache.kafka.clients.consumer.KafkaConsumer.close(
> KafkaConsumer.java:1361)
> 18/07/02 09:32:14 INFO LineBufferedStream: stdout: at
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.close(
> CachedKafkaConsumer.scala:301)
>


Re: Error while doing stream-stream inner join (java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access)

2018-07-03 Thread Jungtaek Lim
Could you please describe the version of Spark, and how did you run your
app? If you don’t mind to share minimal app which can reproduce this, it
would be really great.

- Jungtaek Lim (HeartSaVioR)
On Mon, 2 Jul 2018 at 7:56 PM kant kodali  wrote:

> Hi All,
>
> I get the below error quite often when I do an stream-stream inner join on
> two data frames. After running through several experiments stream-stream
> joins dont look stable enough for production yet. any advice on this?
>
> Thanks!
>
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for
> multi-threaded access
> 18/07/02 09:32:14 INFO LineBufferedStream: stdout: at
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
> 18/07/02 09:32:14 INFO LineBufferedStream: stdout: at
> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1361)
> 18/07/02 09:32:14 INFO LineBufferedStream: stdout: at
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.close(CachedKafkaConsumer.scala:301)
>


Number of records per micro-batch in DStream vs Structured Streaming

2018-07-03 Thread subramgr
Hi, 

We have 2 spark streaming job one using DStreams and the other using
Structured Streaming. I have observed that the number of records per
micro-batch (Per Trigger in case of Structured Streaming) is not the same
between the two jobs. The Structured Streaming job has higher numbers
compared to the DStream job.

Is there any documentation or blog posts on how they differ and is there a
different strategy to consume data from Kafka. I know both use Kafka Direct.

The trigger was set to 60 seconds in Structured Streaming and batch size was
60 seconds as well for the DStream job.

Thanks




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Structured Streaming] Metrics or logs of events that are ignored due to watermark

2018-07-03 Thread subramgr
Thanks



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Building SparkML vectors from long data

2018-07-03 Thread Patrick McCarthy
I'm still validating my results, but my solution for the moment looks like
the below. I'm presently dealing with one-hot encoded values, so all the
numbers in my array are 1:

def udfMaker(feature_len):

return F.udf(lambda x: SparseVector(feature_len, sorted(x),
[1.0]*len(x)), VectorUDT())

indexer =
StringIndexer(inputCol='contentStrings',outputCol='indexedContent).fit(source_df)

makeVec = udfMaker(len(indexer.labels))

indexed_data = indexer.transform(source_df)

sparse_content = (indexed_data.groupBy('ID').
.agg(F.collect_set('indexedContent').alias('contentIdx'))
.withColumn('content', makeVec(F.col('contentIdx')))
.drop('contentIdx')
)

On Tue, Jun 12, 2018 at 3:59 PM, Nathan Kronenfeld <
nkronenfeld@uncharted.software> wrote:

> I don't know if this is the best way or not, but:
>
> val indexer = new StringIndexer().setInputCol("vr").setOutputCol("vrIdx")
> val indexModel = indexer.fit(data)
> val indexedData = indexModel.transform(data)
> val variables = indexModel.labels.length
>
> val toSeq = udf((a: Double, b: Double) => Seq(a, b))
> val toVector = udf((seq: Seq[Seq[Double]]) => {
>   new SparseVector(variables, seq.map(_(0).toInt).toArray, 
> seq.map(_(1)).toArray)
> })
> val result = indexedData
>   .withColumn("val", toSeq(col("vrIdx"), col("value")))
>   .groupBy("ID")
>   .agg(collect_set(col("val")).name("collected_val"))
>   .withColumn("collected_val", 
> toVector(col("collected_val")).as[Row](Encoders.javaSerialization(classOf[Row])))
>
>
> at least works.  The indices still aren't in order in the vector - I don't
> know if this matters much, but if it does, it's easy enough to sort them in
> toVector (and to remove duplicates)
>
>
> On Tue, Jun 12, 2018 at 2:24 PM, Patrick McCarthy  > wrote:
>
>> I work with a lot of data in a long format, cases in which an ID column
>> is repeated, followed by a variable and a value column like so:
>>
>> +---+-+---+
>> |ID | var | value |
>> +---+-+---+
>> | A | v1  | 1.0   |
>> | A | v2  | 2.0   |
>> | B | v1  | 1.5   |
>> | B | v3  | -1.0  |
>> +---+-+---+
>>
>> It seems to me that Spark doesn't provide any clear native way to
>> transform data of this format into a Vector() or VectorUDT() type suitable
>> for machine learning algorithms.
>>
>> The best solution I've found so far (which isn't very good) is to group
>> by ID, perform a collect_list, and then use a UDF to translate the
>> resulting array into a vector datatype.
>>
>> I can get kind of close like so:
>>
>> indexer = MF.StringIndexer(inputCol = 'var', outputCol = 'varIdx')
>>
>> (indexed_df
>> .withColumn('val',F.concat(F.col('varIdx').astype(T.IntegerType()).astype(T.StringType()),
>> F.lit(':'),F.col('value')))
>> .groupBy('ID')
>> .agg(F.collect_set('val'))
>> )
>>
>> But the resultant 'val' vector is out of index order, and still would
>> need to be parsed.
>>
>> What's the current preferred way to solve a problem like this?
>>
>
>


Re: [G1GC] -XX: -ResizePLAB How to provide in Spark Submit

2018-07-03 Thread Aakash Basu
Thanks a ton!

On Tue, Jul 3, 2018 at 6:26 PM, Vadim Semenov  wrote:

> As typical `JAVA_OPTS` you need to pass as a single parameter:
>
> --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:-ResizePLAB"
>
> Also you got an extra space in the parameter, there should be no space
> after the colon symbol
> On Tue, Jul 3, 2018 at 3:01 AM Aakash Basu 
> wrote:
> >
> > Hi,
> >
> > I used the below in the Spark Submit for using G1GC -
> >
> > --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC"
> >
> > Now, I want to use -XX: -ResizePLAB of the G1GC to control to avoid
> performance degradation caused by a large number of thread communications.
> >
> > How to do it? I tried submitting in the similar fashion -
> >
> > --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC" --conf
> "spark.executor.extraJavaOptions=-XX: -ResizePLAB", but it doesn't work.
> >
> > Thanks,
> > Aakash.
>
>
>
> --
> Sent from my iPhone
>


Re: Inferring Data driven Spark parameters

2018-07-03 Thread Vadim Semenov
You can't change the executor/driver cores/memory on the fly once
you've already started a Spark Context.
On Tue, Jul 3, 2018 at 4:30 AM Aakash Basu  wrote:
>
> We aren't using Oozie or similar, moreover, the end to end job shall be 
> exactly the same, but the data will be extremely different (number of 
> continuous and categorical columns, vertical size, horizontal size, etc), 
> hence, if there would have been a calculation of the parameters to arrive at 
> a conclusion that we can simply get the data and derive the respective 
> configuration/parameters, it would be great.
>
> On Tue, Jul 3, 2018 at 1:09 PM, Jörn Franke  wrote:
>>
>> Don’t do this in your job. Create for different types of jobs different jobs 
>> and orchestrate them using oozie or similar.
>>
>> On 3. Jul 2018, at 09:34, Aakash Basu  wrote:
>>
>> Hi,
>>
>> Cluster - 5 node (1 Driver and 4 workers)
>> Driver Config: 16 cores, 32 GB RAM
>> Worker Config: 8 cores, 16 GB RAM
>>
>> I'm using the below parameters from which I know the first chunk is cluster 
>> dependent and the second chunk is data/code dependent.
>>
>> --num-executors 4
>> --executor-cores 5
>> --executor-memory 10G
>> --driver-cores 5
>> --driver-memory 25G
>>
>>
>> --conf spark.sql.shuffle.partitions=100
>> --conf spark.driver.maxResultSize=2G
>> --conf "spark.executor.extraJavaOptions=-XX:+UseParallelGC"
>> --conf spark.scheduler.listenerbus.eventqueue.capacity=2
>>
>> I've come upto these values depending on my R&D on the properties and the 
>> issues I faced and hence the handles.
>>
>> My ask here is -
>>
>> 1) How can I infer, using some formula or a code, to calculate the below 
>> chunk dependent on the data/code?
>> 2) What are the other usable properties/configurations which I can use to 
>> shorten my job runtime?
>>
>> Thanks,
>> Aakash.
>
>


-- 
Sent from my iPhone

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [G1GC] -XX: -ResizePLAB How to provide in Spark Submit

2018-07-03 Thread Vadim Semenov
As typical `JAVA_OPTS` you need to pass as a single parameter:

--conf "spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:-ResizePLAB"

Also you got an extra space in the parameter, there should be no space
after the colon symbol
On Tue, Jul 3, 2018 at 3:01 AM Aakash Basu  wrote:
>
> Hi,
>
> I used the below in the Spark Submit for using G1GC -
>
> --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC"
>
> Now, I want to use -XX: -ResizePLAB of the G1GC to control to avoid 
> performance degradation caused by a large number of thread communications.
>
> How to do it? I tried submitting in the similar fashion -
>
> --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC" --conf 
> "spark.executor.extraJavaOptions=-XX: -ResizePLAB", but it doesn't work.
>
> Thanks,
> Aakash.



-- 
Sent from my iPhone

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Run Python User Defined Functions / code in Spark with Scala Codebase

2018-07-03 Thread Chetan Khatri
Hello Dear Spark User / Dev,

I would like to pass Python user defined function to Spark Job developed
using Scala and return value of that function would be returned to DF /
Dataset API.

Can someone please guide me, which would be best approach to do this.
Python function would be mostly transformation function. Also would like to
pass Java Function as a String to Spark / Scala job and it applies to RDD /
Data Frame and should return RDD / Data Frame.

Thank you.


Re: Inferring Data driven Spark parameters

2018-07-03 Thread Aakash Basu
We aren't using Oozie or similar, moreover, the end to end job shall be
exactly the same, but the data will be extremely different (number of
continuous and categorical columns, vertical size, horizontal size, etc),
hence, if there would have been a calculation of the parameters to arrive
at a conclusion that we can simply get the data and derive the respective
configuration/parameters, it would be great.

On Tue, Jul 3, 2018 at 1:09 PM, Jörn Franke  wrote:

> Don’t do this in your job. Create for different types of jobs different
> jobs and orchestrate them using oozie or similar.
>
> On 3. Jul 2018, at 09:34, Aakash Basu  wrote:
>
> Hi,
>
> Cluster - 5 node (1 Driver and 4 workers)
> Driver Config: 16 cores, 32 GB RAM
> Worker Config: 8 cores, 16 GB RAM
>
> I'm using the below parameters from which I know the first chunk is
> cluster dependent and the second chunk is data/code dependent.
>
> --num-executors 4
> --executor-cores 5
> --executor-memory 10G
> --driver-cores 5
> --driver-memory 25G
>
>
> --conf spark.sql.shuffle.partitions=100
> --conf spark.driver.maxResultSize=2G
> --conf "spark.executor.extraJavaOptions=-XX:+UseParallelGC"
> --conf spark.scheduler.listenerbus.eventqueue.capacity=2
>
> I've come upto these values depending on my R&D on the properties and the
> issues I faced and hence the handles.
>
> My ask here is -
>
> *1) How can I infer, using some formula or a code, to calculate the below
> chunk dependent on the data/code?*
> *2) What are the other usable properties/configurations which I can use to
> shorten my job runtime?*
>
> Thanks,
> Aakash.
>
>


Re: Inferring Data driven Spark parameters

2018-07-03 Thread Jörn Franke
Don’t do this in your job. Create for different types of jobs different jobs 
and orchestrate them using oozie or similar.

> On 3. Jul 2018, at 09:34, Aakash Basu  wrote:
> 
> Hi,
> 
> Cluster - 5 node (1 Driver and 4 workers)
> Driver Config: 16 cores, 32 GB RAM
> Worker Config: 8 cores, 16 GB RAM
> 
> I'm using the below parameters from which I know the first chunk is cluster 
> dependent and the second chunk is data/code dependent.
> 
> --num-executors 4 
> --executor-cores 5
> --executor-memory 10G 
> --driver-cores 5 
> --driver-memory 25G 
> 
> 
> --conf spark.sql.shuffle.partitions=100 
> --conf spark.driver.maxResultSize=2G 
> --conf "spark.executor.extraJavaOptions=-XX:+UseParallelGC" 
> --conf spark.scheduler.listenerbus.eventqueue.capacity=2
> 
> I've come upto these values depending on my R&D on the properties and the 
> issues I faced and hence the handles.
> 
> My ask here is -
> 
> 1) How can I infer, using some formula or a code, to calculate the below 
> chunk dependent on the data/code?
> 2) What are the other usable properties/configurations which I can use to 
> shorten my job runtime?
> 
> Thanks,
> Aakash.


Inferring Data driven Spark parameters

2018-07-03 Thread Aakash Basu
Hi,

Cluster - 5 node (1 Driver and 4 workers)
Driver Config: 16 cores, 32 GB RAM
Worker Config: 8 cores, 16 GB RAM

I'm using the below parameters from which I know the first chunk is cluster
dependent and the second chunk is data/code dependent.

--num-executors 4
--executor-cores 5
--executor-memory 10G
--driver-cores 5
--driver-memory 25G


--conf spark.sql.shuffle.partitions=100
--conf spark.driver.maxResultSize=2G
--conf "spark.executor.extraJavaOptions=-XX:+UseParallelGC"
--conf spark.scheduler.listenerbus.eventqueue.capacity=2

I've come upto these values depending on my R&D on the properties and the
issues I faced and hence the handles.

My ask here is -

*1) How can I infer, using some formula or a code, to calculate the below
chunk dependent on the data/code?*
*2) What are the other usable properties/configurations which I can use to
shorten my job runtime?*

Thanks,
Aakash.


[G1GC] -XX: -ResizePLAB How to provide in Spark Submit

2018-07-03 Thread Aakash Basu
Hi,

I used the below in the Spark Submit for using G1GC -

--conf "spark.executor.extraJavaOptions=-XX:+UseG1GC"

Now, I want to use *-XX: -ResizePLAB *of the G1GC to control to avoid
performance degradation caused by a large number of thread communications.

How to do it? I tried submitting in the similar fashion -

--conf "spark.executor.extraJavaOptions=-XX:+UseG1GC" --conf
"spark.executor.extraJavaOptions=*-XX: -ResizePLAB*", but it doesn't work.

Thanks,
Aakash.