Re: Problem with Kafka group.id

2020-03-24 Thread Spico Florin
Hello!

Maybe you can find more information on the same issue reported here:
https://jaceklaskowski.gitbooks.io/spark-structured-streaming/spark-sql-streaming-KafkaSourceProvider.html


validateGeneralOptions makes sure that group.id has not been specified and
reports an IllegalArgumentException otherwise.
+

Kafka option 'group.id' is not supported as user-specified consumer
groups are not used to track offset

https://github.com/Azure/azure-event-hubs-for-kafka/issues/35

I hope it helps, Florin

On Mon, Mar 23, 2020 at 5:45 PM Sethupathi T
 wrote:

> I had exact same issue, the temp fix what I did was, took open source code
> from github, modified the group.id mandatory logic and built customized
> library.
>
> Thanks,
>
> On Tue, Mar 17, 2020 at 7:34 AM Sjoerd van Leent <
> sjoerd.van.le...@alliander.com> wrote:
>
>> Dear reader,
>>
>>
>>
>> I must force the group.id of Kafka, as Kafka is under ACL control,
>> however, doing so gives me the error:
>>
>>
>>
>> Kafka option 'group.id' is not supported as user-specified consumer
>> groups are not used to track offsets.
>>
>>
>>
>> This won’t work, as not being able to set it, basically disqualifies
>> using Spark within our organization. How can I force (Py)Spark to respect
>> the group.id used?
>>
>>
>>
>> Met vriendelijke groet,
>>
>>
>>
>> *Sjoerd van Leent*
>>
>> Systeem Engineer | IT AST-B CSC
>>
>>
>>
>> *M   *+31 6 11 24 52 27
>> *E *   sjoerd.van.le...@alliander.com
>>
>>
>> *Alliander N.V.  *.  Postbus 50, 6920 AB Duiven, Nederland  .
>> Locatiecode: 2PB2100  .  Utrechtseweg 68, 6812 AH Arnhem
>> 
>>  .  KvK 09104351 Arnhem  .  *www.alliander.com
>>  *
>>
>>
>>
>> De inhoud van deze e-mail, inclusief bijlagen, is persoonlijk en
>> vertrouwelijk. Mocht dit bericht niet voor u bedoeld zijn, informeer dan
>> per omgaande de afzender en verwijder dit bericht. Gelieve deze e-mail,
>> inclusief eventuele bijlagen, niet te gebruiken, kopiëren of door te sturen
>> aan derden.
>>
>>
>>
>>
>>
>


Re: Driver OutOfMemoryError in MapOutputTracker$.serializeMapStatuses for 40 TB shuffle.

2019-11-08 Thread Spico Florin
Hi!
What file system are you using: EMRFS or HDFS?
Also what memory are you using for the reducer ?

On Thu, Nov 7, 2019 at 8:37 PM abeboparebop  wrote:

> I ran into the same issue processing 20TB of data, with 200k tasks on both
> the map and reduce sides. Reducing to 100k tasks each resolved the issue.
> But this could/would be a major problem in cases where the data is bigger
> or
> the computation is heavier, since reducing the number of partitions may not
> be an option.
>
>
> harelglik wrote
> > I understand the error is because the number of partitions is very high,
> > yet when processing 40 TB (and this number is expected to grow) this
> > number
> > seems reasonable:
> > 40TB / 300,000 will result in partitions size of ~ 130MB (data should be
> > evenly distributed).
> >
> > On Fri, Sep 7, 2018 at 6:28 PM Vadim Semenov 
>
> > vadim@
>
> >  wrote:
> >
> >> You have too many partitions, so when the driver is trying to gather
> >> the status of all map outputs and send back to executors it chokes on
> >> the size of the structure that needs to be GZipped, and since it's
> >> bigger than 2GiB, it produces OOM.
> >> On Fri, Sep 7, 2018 at 10:35 AM Harel Gliksman 
>
> > harelglik@
>
> > 
> >> wrote:
> >> >
> >> > Hi,
> >> >
> >> > We are running a Spark (2.3.1) job on an EMR cluster with 500
> >> r3.2xlarge
> >> (60 GB, 8 vcores, 160 GB SSD ). Driver memory is set to 25GB.
> >> >
> >> > It processes ~40 TB of data using aggregateByKey in which we specify
> >> numPartitions = 300,000.
> >> > Map side tasks succeed, but reduce side tasks all fail.
> >> >
> >> > We notice the following driver error:
> >> >
> >> > 18/09/07 13:35:03 WARN Utils: Suppressing exception in finally: null
> >> >
> >> >  java.lang.OutOfMemoryError
> >> >
> >> > at
> >>
> java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> >> > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> >> > at
> >>
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> >> > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> >> > at
> >>
> java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
> >> > at
> >> java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
> >> > at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
> >> > at
> >>
> java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
> >> > at
> >>
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
> >> > at
> >>
> java.io.ObjectOutputStream$BlockDataOutputStream.flush(ObjectOutputStream.java:1822)
> >> > at java.io.ObjectOutputStream.flush(ObjectOutputStream.java:719)
> >> > at java.io.ObjectOutputStream.close(ObjectOutputStream.java:740)
> >> > at
> >>
> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$2.apply$mcV$sp(MapOutputTracker.scala:790)
> >> > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1389)
> >> > at
> >>
> org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:789)
> >> > at
> >>
> org.apache.spark.ShuffleStatus.serializedMapStatus(MapOutputTracker.scala:174)
> >> > at
> >>
> org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:397)
> >> > at
> >>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> >> > at
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> >> > at java.lang.Thread.run(Thread.java:748)
> >> > Exception in thread "map-output-dispatcher-0"
> >> java.lang.OutOfMemoryError
> >> > at
> >>
> java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> >> > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> >> > at
> >>
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> >> > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> >> > at
> >>
> java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
> >> > at
> >> java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
> >> > at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
> >> > at
> >>
> java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
> >> > at
> >>
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
> >> > at
> >>
> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
> >> > at
> >> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
> >> > at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
> >> > at
> >> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
> >> > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> >> > at
> >>
> 

[Spark Streaming] Apply multiple ML pipelines(Models) to the same stream

2019-10-31 Thread Spico Florin
Hello!

I have an use case where I have to apply multiple already trained models
(e.g. M1, M2, ..Mn) on the same spark stream ( fetched from kafka).

The models were trained usining the isolation forest algorithm from here:
https://github.com/titicaca/spark-iforest


I have found something similar with my case here
https://www.youtube.com/watch?v=EhRHQPCdldI, but unfortunately I don't know
if the company Genesys (former AltoCloud) made this API  (StreamPipeline,
Heterogenous Pipeline ) open source.

I handled this with the above schema code, but I don't know how optimal is.

//read the stream
val kafkaStreamDF = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", broker)
  .option("subscribe", "topic")
  .load
val myModels = Array("m1", "m2","m3","m4")
//parallize the input models in order to have multiple threads handling the
same stream, otherwise blocked??
 myModels.par.foreach(lm => {

 //load the model
 val model = PipelineModel.load(lm)

  kafkaStreamDF.writeStream.foreachBatch({ (batchDF: DataFrame,
batchId: Long) =>
//apply model
val pdf =
model.transform(batchDF).selectExpr("CAST(to_json(struct(*)) AS STRING) AS
value").write
  .format("json")
  .save("anom/" + lm +  System.currentTimeMillis())
  }).start().awaitTermination()
})

Questions:
1. Therefore, I would like to know if there is any any Spark API for
handling such an use case?

2. If yes, where can I find it?

3. If no, how can I optimally implement this?

Any idea, suggestions is highly appreciated.

Thanks.
 Florin


Re: Kafka Integration libraries put in the fat jar

2019-07-31 Thread Spico Florin
Hi!
 Thanks to Jacek Laskowski
<https://stackoverflow.com/users/1305344/jacek-laskowski>, I found the
answer here

https://stackoverflow.com/questions/51792203/how-to-get-spark-kafka-org-apache-sparkspark-sql-kafka-0-10-2-112-1-0-dependen

Just add the maven shade plugin:

 
org.apache.maven.plugins
maven-shade-plugin
3.0.0


package

shade




*:*

META-INF/*.SF
META-INF/*.DSA
META-INF/*.RSA






META-INF/services/org.apache.spark.sql.sources.DataSourceRegister



org.apache.spark.examples.sql.streaming.JavaStructuredKafkaWordCount








On Tue, Jul 30, 2019 at 4:38 PM Spico Florin  wrote:

> Hello!
>
> I would like to use the spark structured streaming integrated with Kafka
> the way is described here:
>
> https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
>
>
> but I got the following issue:
>
> Caused by: org.apache.spark.sql.AnalysisException: Failed to find data
> source: kafka. Please deploy the application as per the deployment section
> of "Structured Streaming + Kafka Integration Guide".;
>
> eventhough  I've added in the generated fat jar the kafka-sql dependencies:
>  
> org.apache.spark
> spark-sql-kafka-0-10_2.11
> 2.4.3
> compile
> 
>
> When I submit with the command
>
> spark-submit  --master spark://spark-master:7077  --class myClass
> --deploy-mode client *--packages
> org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.3
> my-fat-jar-with-dependencies.jar*
>
> the problem is gone.
>
> Since the packages option requires to download the libaries from an
> environment that has access to internet and I don't have it, can you please
> advice what can I do to add kafka dependecies either in the fat jar or
> other solution.
>
> Thank you.
>
> Regards,
>
> Florin
>
>
>


Kafka Integration libraries put in the fat jar

2019-07-30 Thread Spico Florin
Hello!

I would like to use the spark structured streaming integrated with Kafka
the way is described here:
https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html


but I got the following issue:

Caused by: org.apache.spark.sql.AnalysisException: Failed to find data
source: kafka. Please deploy the application as per the deployment section
of "Structured Streaming + Kafka Integration Guide".;

eventhough  I've added in the generated fat jar the kafka-sql dependencies:
 
org.apache.spark
spark-sql-kafka-0-10_2.11
2.4.3
compile


When I submit with the command

spark-submit  --master spark://spark-master:7077  --class myClass
--deploy-mode client *--packages
org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.3
my-fat-jar-with-dependencies.jar*

the problem is gone.

Since the packages option requires to download the libaries from an
environment that has access to internet and I don't have it, can you please
advice what can I do to add kafka dependecies either in the fat jar or
other solution.

Thank you.

Regards,

Florin


Dataframe Publish to RabbitMQ

2019-06-21 Thread Spico Florin
Hello!
Can you please share some code/thoughts on how to publish data from a
dataframe to RabbbitMQ?

Thanks.
Regards,
Florin


Re: Run/install tensorframes on zeppelin pyspark

2018-08-10 Thread Spico Florin
Hello!
  Thank you very much for your response.
As I understood, in order to use tensorframes in Zeppelin pyspark notebook
with spark master locally
1. we should run command pip install tensorframes
2. we should set up the PYSPARK_PYTHON in conf/zeppelin-env.sh

I have performed the above steps like this

python2.7 -m pip install tensorframes==0.2.7
export PYSPARK_PYTHON=python2.7 in  in conf/zeppelin-env.sh
"zeppelin.pyspark.python": "python2.7 in conf/interpreter.json

As you can see the installation and the configurations refers to the same
python2.7 version.
After performing all of these steps, I'm still getting the same error
 *"ImportError:
No module named tensorframes"*

I'm still puzzled how this import works fine in the pyspark command from
the spark and for example in python2.7 results in errors.
Also I've observed that pyspark shell from /spark/bin doesn't need the
tensorframes python package installed and this is more confusing.
Zeppelin pyspark interpreter is not using the same approach as spark
pyspark shell?

Is someone succeeded to import/use correctly tensorframes in Zeppelin with
default spark master setup (local[*]?) If yes how?

I look forward for your answers/

Regards,
 Florin

















On Thu, Aug 9, 2018 at 3:52 AM, Jeff Zhang  wrote:

>
> Make sure you use the correct python which has tensorframe installed.  Use 
> PYSPARK_PYTHON
> to configure the python
>
>
>
> Spico Florin 于2018年8月8日周三 下午9:59写道:
>
>> Hi!
>>
>> I would like to use tensorframes in my pyspark notebook.
>>
>> I have performed the following:
>>
>> 1. In the spark intepreter adde a new repository http://dl.bintray.
>> com/spark-packages/maven
>> 2. in the spark interpreter added the dependency databricks:
>> tensorframes:0.2.9-s_2.11
>> 3. pip install tensorframes
>>
>>
>> In both 0.7.3 and 0.8.0:
>> 1.  the following code resulted in error: "ImportError: No module named
>> tensorframes"
>>
>> %pyspark
>> import tensorframes as tfs
>>
>> 2. the following code succeeded
>> %spark
>> import org.tensorframes.{dsl => tf}
>> import org.tensorframes.dsl.Implicits._
>> val df = spark.createDataFrame(Seq(1.0->1.1, 2.0->2.2)).toDF("a", "b")
>>
>> // As in Python, scoping is recommended to prevent name collisions.
>> val df2 = tf.withGraph {
>> val a = df.block("a")
>> // Unlike python, the scala syntax is more flexible:
>> val out = a + 3.0 named "out"
>> // The 'mapBlocks' method is added using implicits to dataframes.
>> df.mapBlocks(out).select("a", "out")
>> }
>>
>> // The transform is all lazy at this point, let's execute it with collect:
>> df2.collect()
>>
>> I ran the code above directly with spark interpreter with the default
>> configurations (master set up to local[*] - so not via spark-submit
>> command) .
>>
>> Also, I have installed spark home locally and ran the command
>>
>> $SPARK_HOME/bin/pyspark --packages databricks:tensorframes:0.2.9-s_2.11
>>
>> and the code below worked as expcted
>>
>> import tensorframes as tfs
>>
>>  Can you please help to solve this?
>>
>> Thanks,
>>
>>  Florin
>>
>>
>>
>>
>>
>>
>>
>>
>>


Run/install tensorframes on zeppelin pyspark

2018-08-08 Thread Spico Florin
Hi!

I would like to use tensorframes in my pyspark notebook.

I have performed the following:

1. In the spark intepreter adde a new repository
http://dl.bintray.com/spark-packages/maven
2. in the spark interpreter added the
dependency databricks:tensorframes:0.2.9-s_2.11
3. pip install tensorframes


In both 0.7.3 and 0.8.0:
1.  the following code resulted in error: "ImportError: No module named
tensorframes"

%pyspark
import tensorframes as tfs

2. the following code succeeded
%spark
import org.tensorframes.{dsl => tf}
import org.tensorframes.dsl.Implicits._
val df = spark.createDataFrame(Seq(1.0->1.1, 2.0->2.2)).toDF("a", "b")

// As in Python, scoping is recommended to prevent name collisions.
val df2 = tf.withGraph {
val a = df.block("a")
// Unlike python, the scala syntax is more flexible:
val out = a + 3.0 named "out"
// The 'mapBlocks' method is added using implicits to dataframes.
df.mapBlocks(out).select("a", "out")
}

// The transform is all lazy at this point, let's execute it with collect:
df2.collect()

I ran the code above directly with spark interpreter with the default
configurations (master set up to local[*] - so not via spark-submit
command) .

Also, I have installed spark home locally and ran the command

$SPARK_HOME/bin/pyspark --packages databricks:tensorframes:0.2.9-s_2.11

and the code below worked as expcted

import tensorframes as tfs

 Can you please help to solve this?

Thanks,

 Florin


Dataframe to automatically create Impala table when writing to Impala

2018-06-22 Thread Spico Florin
Hello!
  I would like to know if there is any feature in Spark Dataframe that when
is writing data to a Impala table, to also create that table when this
table was not previously cretaed in Impala .

For example, the code:

myDatafarme.write.mode(SaveMode.Overwrite).jdbc(jdbcURL, "books",
connectionProperties)

should create the table if it doesn't exists.

The table schema should be determined from the dataframe schema.

I look forward for your sugestions/ideas.

Regards,
 Florin


Spark 1.6 change the number partitions without repartition and without shuffling

2018-06-13 Thread Spico Florin
Hello!

 I have a parquet file that has 60MB representing 10millions records.
When I read this file using Spark 2.3.0 and with the
configuration spark.sql.files.maxPartitionBytes=1024*1024*2 (=2MB) I got 29
partitions  as expected.
Code:
 sqlContext.setConf("spark.sql.files.maxPartitionBytes",
Long.toString(2097152));
DataFrame inputDataDf = sqlContext.read().parquet("10Mrecords.parquet");


But when I read the same file with the Spark 1.6.0, the above configuration
will not take effect and I get a single partition. Thus one task that will
do all the processing and no parallelism.

Also, I have use the following configurations without any effect:

Write the parquet file with diffrent size in order to increase the number
of group blocks
 sparkContext.hadoopConfiguration.setLong("parquet.block.size", 1024*50)

 sparkContext.hadoopConfiguration.setLong("mapred.max.split.size", 1024*50)


My question is:
How to achieve the same behavior (to get the desired number of partitions)
when using Spark 1.6 (without repartition method and without any method
that incurs shuffling)?

I look forward for your answers.
 Regards,
  Florin


Re: testing frameworks

2018-06-04 Thread Spico Florin
Hello!
  Thank you very much for your helpful answer and for the very good job
performed in spark-testing-base . I managed to perform unit testing with
spark-testing-base library as the provided article and also get inspired
from

https://github.com/holdenk/spark-testing-base/blob/master/src/test/1.3/java/com/holdenkarau/spark/testing/SampleJavaRDDTest.java
.


I had some concerns regarding on how to deal with compairing the RDDs that
come from Dataframe and the one that come from jsc().parallelize method.

My workflow tests is as follow:
1. Get the data from a parquet file as dataframe
2. Convert dataframe  to toJavaRDD()
3. perform some mapping on the JavaRdd
4. Check whether the resulted mapped rdd  is equal with the expected one
(retrieved from a text file)

I performed the above test with following code snippet

 JavaRDD expected = jsc().parallelize(input_from_text_file);
SparkSession spark = SparkSession.builder().getOrCreate();

JavaRDD input =

spark.read().parquet("src/test/resources/test_data.parquet").toJavaRDD();

JavaRDD result = MyDriver.convertToMyCustomerData(input);
 JavaRDDComparisons.assertRDDEquals(expected, result);

The above tests failed failed, even through the data is the same. By
debugging the code, I observed that the data from that came from the
DataFrame didn't have the same order as the one that came from
jsc().parallelize(text_file).

So, I suppose that the issue came from the fact that the SparkSession and
jsc() don't share the same SparkContext (there is a warning about this when
running the program).

Therefore I came to the solution, to use the same jsc for both of the
expected and the result. With this solution the assertion succeeded as
expected.

  List df
=spark.read().parquet("src/test/resources/test_data.parquet").toJavaRDD().collect();
JavaRDD input = jsc().parallelize(df);

JavaRDD result = MyDriver.convertToMyCustomerData(input);
 JavaRDDComparisons.assertRDDEquals(expected, result);


My questions are:
1. what is the best solution to deal with RDDs comparison  when the RDDs
are built from Dataframes and when they are tested with RDDs obtained via
jsc().parallelize()?
2. Is the above solution a suitable one?

I look forward for your answers.

Regards,
  Florin







On Wed, May 30, 2018 at 3:11 PM, Holden Karau  wrote:

> So Jessie has an excellent blog post on how to use it with Java
> applications -
> http://www.jesse-anderson.com/2016/04/unit-testing-spark-with-java/
>
> On Wed, May 30, 2018 at 4:14 AM Spico Florin 
> wrote:
>
>> Hello!
>>   I'm also looking for unit testing spark Java application. I've seen the
>> great work done in  spark-testing-base but it seemed to me that I could
>> not use for Spark Java applications.
>> Only spark scala applications are supported?
>> Thanks.
>> Regards,
>>  Florin
>>
>> On Wed, May 23, 2018 at 8:07 AM, umargeek 
>> wrote:
>>
>>> Hi Steve,
>>>
>>> you can try out pytest-spark plugin if your writing programs using
>>> pyspark
>>> ,please find below link for reference.
>>>
>>> https://github.com/malexer/pytest-spark
>>> <https://github.com/malexer/pytest-spark>
>>>
>>> Thanks,
>>> Umar
>>>
>>>
>>>
>>> --
>>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>> --
> Twitter: https://twitter.com/holdenkarau
>


Re: testing frameworks

2018-05-30 Thread Spico Florin
Hello!
  I'm also looking for unit testing spark Java application. I've seen the
great work done in  spark-testing-base but it seemed to me that I could not
use for Spark Java applications.
Only spark scala applications are supported?
Thanks.
Regards,
 Florin

On Wed, May 23, 2018 at 8:07 AM, umargeek 
wrote:

> Hi Steve,
>
> you can try out pytest-spark plugin if your writing programs using pyspark
> ,please find below link for reference.
>
> https://github.com/malexer/pytest-spark
> 
>
> Thanks,
> Umar
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Does saveAsHadoopFile depend on master?

2016-06-22 Thread Spico Florin
Hi!
 I had a similar issue when the user that submit the job to the spark
cluster didn't have permission to write into the hdfs. If you have the hdfs
GUI then you can check which users are and what permissions. Also can in
hdfs browser:(
http://stackoverflow.com/questions/27996034/opening-a-hdfs-file-in-browser.
) if your folder structure was created.

If you have  yarn, an example ciuld be  spark-submit --class "yopur.class"
--master yarn-client --num-executors 12 --executor-memory 16g
--driver-memory 8g --executor-cores 8 .jar
 hdfs:output

I hope it helps.
 Regards,\ Florin

On Wed, Jun 22, 2016 at 4:57 AM, Jeff Zhang  wrote:

> Please check the driver and executor log, there should be logs about where
> the data is written.
>
>
>
> On Wed, Jun 22, 2016 at 2:03 AM, Pierre Villard <
> pierre.villard...@gmail.com> wrote:
>
>> Hi,
>>
>> I have a Spark job writing files to HDFS using .saveAsHadoopFile method.
>>
>> If I run my job in local/client mode, it works as expected and I get all
>> my files written in HDFS. However if I change to yarn/cluster mode, I don't
>> see any error logs (the job is successful) and there is no files written to
>> HDFS.
>>
>> Is there any reason for this behavior? Any thoughts on how to track down
>> what is happening here?
>>
>> Thanks!
>>
>> Pierre.
>>
>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: Using Log4j for logging messages inside lambda functions

2015-05-26 Thread Spico Florin
Hello!
  Thank you all for your answers. Akhil's proposed solution works fine.
Thanks.
 Florin

On Tue, May 26, 2015 at 3:08 AM, Wesley Miao wesley.mi...@gmail.com wrote:

 The reason it didn't work for you is that the function you registered with
 someRdd.map will be running on the worker/executor side, not in your
 driver's program. Then you need to be careful to not accidentally close
 over some objects instantiated from your driver's program, like the log
 object in your sample code above. You can look for more information online
 to understand more the concept of Closure so that you can understand to
 the bottom of it why it didn't work for you at first place.

 The usual solution to this type of problems is to instantiate the objects
 you want to use in your map functions from within your map functions. You
 can define a factory object that you can create your log object from.

 On Mon, May 25, 2015 at 11:05 PM, Spico Florin spicoflo...@gmail.com
 wrote:

 Hello!
   I would like to use the logging mechanism provided by the log4j, but
 I'm getting the
 Exception in thread main org.apache.spark.SparkException: Task not
 serializable - Caused by: java.io.NotSerializableException:
 org.apache.log4j.Logger

 The code (and the problem) that I'm using resembles the one used here :
 http://stackoverflow.com/questions/29208844/apache-spark-logging-within-scala,
 meaning:

 val log = Logger.getLogger(getClass.getName)

   def doTest() {
val conf = new SparkConf().setMaster(local[4]).setAppName(LogTest)
val spark = new SparkContext(conf)

val someRdd = spark.parallelize(List(1, 2, 3))
someRdd.map {
  element =
*log.info http://log.info(s$element will be processed)*
element + 1
 }
 I'm posting the same problem due to the fact that the one from
 stackoverflow didn't get any answer.
 In this case, can you please tell us what is the best way to use  logging?
 Is any solution that is not using the rdd.forEachPartition?

 I look forward for your answers.
 Regards,
 Florin










Using Log4j for logging messages inside lambda functions

2015-05-25 Thread Spico Florin
Hello!
  I would like to use the logging mechanism provided by the log4j, but I'm
getting the
Exception in thread main org.apache.spark.SparkException: Task not
serializable - Caused by: java.io.NotSerializableException:
org.apache.log4j.Logger

The code (and the problem) that I'm using resembles the one used here :
http://stackoverflow.com/questions/29208844/apache-spark-logging-within-scala,
meaning:

val log = Logger.getLogger(getClass.getName)

  def doTest() {
   val conf = new SparkConf().setMaster(local[4]).setAppName(LogTest)
   val spark = new SparkContext(conf)

   val someRdd = spark.parallelize(List(1, 2, 3))
   someRdd.map {
 element =
   *log.info http://log.info(s$element will be processed)*
   element + 1
}
I'm posting the same problem due to the fact that the one from
stackoverflow didn't get any answer.
In this case, can you please tell us what is the best way to use  logging?
Is any solution that is not using the rdd.forEachPartition?

I look forward for your answers.
Regards,
Florin


Does HadoopRDD.zipWithIndex method preserve the order of the input data from Hadoop?

2015-04-24 Thread Spico Florin
Hello!
  I know that HadoopRDD partitions are built based on the number of splits
in HDFS. I'm wondering if these partitions preserve the initial order of
data in file.
As an example, if I have an HDFS (myTextFile) file that has these splits:

split 0- line 1, ..., line k
split 1-line k+1,..., line k+n
splt 2-line k+n, line k+n+m

and the code
val lines=sc.textFile(hdfs://mytextFile)
lines.zipWithIndex()

will the order of lines preserved?
(line 1, zipIndex 1) , .. (line k, zipIndex k), and so one.

I found this question on stackoverflow (
http://stackoverflow.com/questions/26046410/how-can-i-obtain-an-element-position-in-sparks-rdd)
whose answer intrigued me:
Essentially, RDD's zipWithIndex() method seems to do this, but it won't
preserve the original ordering of the data the RDD was created from

Can you please confirm that is this the correct answer?

Thanks.
 Florin


Spark RDD sortByKey triggering a new job

2015-04-24 Thread Spico Florin
I have tested sortByKey method with the following code and I have observed
that is triggering a new job when is called. I could find this in the
neither in API nor in the code. Is this an indented behavior? For example,
the RDD zipWithIndex method API specifies that will trigger a new job. But
what about sortByKey?

val sc = new SparkContext(new SparkConf().setAppName(Spark Count))
val l =sc.parallelize(List((5,'c'),(2,'d'),(1,'a'),(7,'e')), 3)

l.sortByKey()

Thanks for your answers.


Order of execution of tasks inside of a stage and computing the number of stages

2015-04-20 Thread Spico Florin
Hello!
I'm newbie in spark I would like to understand some basic mechanism on how
it works behind the scenes.
I have attached the lineage of my RDD and I have the following questions:
1. Why do I have 8 stages instead of 5? From the book Learning from Spark
(Chapter 8 -http://bit.ly/1E0Hah7), I could understand that  RDDs that
exist at the same level of indentation as their
parents will be pipelined [into same physical stage] during physical
execution. Since I have 5 parents, I'm expected to have 5 stages. Still
the Spark UI stages view, shows 8 stages.
Also what represents the (8) represented in the debug string? Is any bug in
this function?
2. At the stage level, what is the execution order among the tasks? They
can be executed all of them in parallel (for example: test4spark.csv
HadoopRDD[0] ||  test4spark.csv MappedRDD[1] || MapPartitionsRDD[4] ||
 ZippedWithIndexRDD[6]) or they are waiting each task upon the other to
complete ( test4spark.csv HadoopRDD[0]=completed= test4spark.csv
MappedRDD[1]=completed=etc)
3. Between stages, the order is given by the execution plan, so each stage
is waiting till the ones before  it will be completed. Is this a correct
assumption?

I look forward for your answers.
Regards,
 Florin


(8) MappedRDD[21] at map at WAChunkSepvgFilterNewModel.scala:298 []
 |  MappedRDD[20] at map at WAChunkSepvgFilterNewModel.scala:182 []
 |  ShuffledRDD[19] at sortByKey at WAChunkSepvgFilterNewModel.scala:182 []
 +-(8) ShuffledRDD[16] at aggregateByKey at
WAChunkSepvgFilterNewModel.scala:182 []
+-(8) FlatMappedRDD[15] at flatMap at
WAChunkSepvgFilterNewModel.scala:174 []
   |  ZippedWithIndexRDD[14] at zipWithIndex at
WAChunkSepvgFilterNewModel.scala:174 []
   |  MappedRDD[13] at map at WAChunkSepvgFilterNewModel.scala:272 []
   |  MappedRDD[12] at map at WAChunkSepvgFilterNewModel.scala:161 []
   |  ShuffledRDD[11] at sortByKey at
WAChunkSepvgFilterNewModel.scala:161 []
   +-(8) ShuffledRDD[8] at aggregateByKey at
WAChunkSepvgFilterNewModel.scala:161 []
  +-(8) FlatMappedRDD[7] at flatMap at
WAChunkSepvgFilterNewModel.scala:153 []
 |  ZippedWithIndexRDD[6] at zipWithIndex at
WAChunkSepvgFilterNewModel.scala:153 []
 |  MappedRDD[5] at map at WAChunkSepvgFilterNewModel.scala:248
[]
 |  MapPartitionsRDD[4] at mapPartitionsWithIndex at
WAChunkSepvgFilterNewModel.scala:114 []
 |  test4spark.csv MappedRDD[1] at textFile at
WAChunkSepvgFilterNewModel.scala:215 []
 |  test4spark.csv HadoopRDD[0] at textFile at
WAChunkSepvgFilterNewModel.scala:215 []

[image: Inline image 1]

Excerpt from the book: The lineage output shown in
Example 8-8 uses indentation levels to show where RDDs are going to be
pipelined
together into physical stages. RDDs that exist at the same level of
indentation as their
parents will be pipelined during physical execution



Re: Save org.apache.spark.mllib.linalg.Matri to a file

2015-04-16 Thread Spico Florin
Thank you very much for your suggestions, Ignacio!
  I have posted my solution here:
http://stackoverflow.com/questions/29649904/save-spark-org-apache-spark-mllib-linalg-matrix-to-a-file/29671193#29671193

Best regards,
  Florin

On Wed, Apr 15, 2015 at 5:28 PM, Ignacio Blasco elnopin...@gmail.com
wrote:

 You can turn the Matrix to an Array with .toArray and then:
 1- Write it using Scala/Java IO to the local disk of the driver
 2- parallelize it and use .saveAsTextFile

 2015-04-15 14:16 GMT+02:00 Spico Florin spicoflo...@gmail.com:

 Hello!

 The result of correlation in Spark MLLib is a of type
 org.apache.spark.mllib.linalg.Matrix. (see
 http://spark.apache.org/docs/1.2.1/mllib-statistics.html#correlations)

 val data: RDD[Vector] = ...

 val correlMatrix: Matrix = Statistics.corr(data, pearson)

 I would like to save the result into a file. How can I do this?

  Thanks,

  Florin





Save org.apache.spark.mllib.linalg.Matri to a file

2015-04-15 Thread Spico Florin
Hello!

The result of correlation in Spark MLLib is a of type
org.apache.spark.mllib.linalg.Matrix. (see
http://spark.apache.org/docs/1.2.1/mllib-statistics.html#correlations)

val data: RDD[Vector] = ...

val correlMatrix: Matrix = Statistics.corr(data, pearson)

I would like to save the result into a file. How can I do this?

 Thanks,

 Florin


Matrix Transpose

2015-04-02 Thread Spico Florin
Hello!
  I have a CSV file that has the following content:
C1;C2;C3
11;22;33
12;23;34
13;24;35
 What is the best approach to use Spark (API, MLLib) for achieving the
transpose of it?
C1 11 12 13
C2 22 23 24
C3 33 34 35


I look forward for your solutions and suggestions (some Scala code will be
really helpful).

Thanks.
 Florin

P.S. In reality my matrix has more than 1000 columns and more than 1
million rows.


Re: Optimal solution for getting the header from CSV with Spark

2015-03-25 Thread Spico Florin
Hello!
  Thank for your responses. I was afraid that due to partitioning I will
loose the logic that the first element is the header. I observe that
rdd.first calls behind the rdd.take(1) method.
Best regards,
  Florin

On Tue, Mar 24, 2015 at 4:41 PM, Dean Wampler deanwamp...@gmail.com wrote:

 Instead of data.zipWithIndex().filter(_._2==0), which will cause Spark to
 read the whole file, use data.take(1), which is simpler.

 From the Rdd.take documentation, it works by first scanning one partition,
 and using the results from that partition to estimate the number of
 additional partitions needed to satisfy the limit. In this case, it will
 trivially stop at the first.


 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Tue, Mar 24, 2015 at 7:12 AM, Spico Florin spicoflo...@gmail.com
 wrote:

 Hello!

 I would like to know what is the optimal solution for getting the header
 with from a CSV file with Spark? My aproach was:

 def getHeader(data: RDD[String]): String = {
 data.zipWithIndex().filter(_._2==0).map(x=x._1).take(1).mkString() }

 Thanks.





Optimal solution for getting the header from CSV with Spark

2015-03-24 Thread Spico Florin
Hello!

I would like to know what is the optimal solution for getting the header
with from a CSV file with Spark? My aproach was:

def getHeader(data: RDD[String]): String = {
data.zipWithIndex().filter(_._2==0).map(x=x._1).take(1).mkString() }

Thanks.


Re: Number of Executors per worker process

2015-03-02 Thread Spico Florin
Hello!
  Thank you very much for your response. In the book Learning Spark I
found out the following sentence:

Each application will have at most one executor on each worker

So worker can have one or none executor process spawned (perhaps the number
depends on the workload distribution).


Best regards,

 Florin

On Thu, Feb 26, 2015 at 1:04 PM, Jeffrey Jedele jeffrey.jed...@gmail.com
wrote:

 Hi Spico,

 Yes, I think an executor core in Spark is basically a thread in a worker
 pool. It's recommended to have one executor core per physical core on your
 machine for best performance, but I think in theory you can create as many
 threads as your OS allows.

 For deployment:
 There seems to be the actual worker JVM which coordinates the work on a
 worker node. I don't think the actual thread pool lives in there, but a
 separate JVM is created for each application that has cores allocated on
 the node. Otherwise it would be rather hard to impose memory limits on
 application level and it would have serious disadvantages regarding
 stability.

 You can check this behavior by looing at the processes on your machine:
 ps aux | grep spark.deploy = will show  master, worker (coordinator) and
 driver JVMs
 ps aux | grep spark.executor = will show the actual worker JVMs

 2015-02-25 14:23 GMT+01:00 Spico Florin spicoflo...@gmail.com:

 Hello!
  I've read the documentation about the spark architecture, I have the
 following questions:
 1: how many executors can be on a single worker process (JMV)?
 2:Should I think executor like a Java Thread Executor where the pool size
 is equal with the number of the given cores (set up by the
 SPARK_WORKER_CORES)?
 3. If the worker can have many executors, how this is handled by the
 Spark? Or can I handle by myself to set up the number of executors per JVM?

 I look forward for your answers.
   Regards,
   Florin





Number of Executors per worker process

2015-02-25 Thread Spico Florin
Hello!
 I've read the documentation about the spark architecture, I have the
following questions:
1: how many executors can be on a single worker process (JMV)?
2:Should I think executor like a Java Thread Executor where the pool size
is equal with the number of the given cores (set up by the
SPARK_WORKER_CORES)?
3. If the worker can have many executors, how this is handled by the Spark?
Or can I handle by myself to set up the number of executors per JVM?

I look forward for your answers.
  Regards,
  Florin


MLib usage on Spark Streaming

2015-02-16 Thread Spico Florin
Hello!
  I'm newbie to Spark and I have the following case study:
1. Client sending at 100ms the following data:
  {uniqueId, timestamp, measure1, measure2 }
2. Each 30 seconds I would like to correlate the data collected in the
window, with some predefined double vector pattern for each given key. The
predefined pattern has 300 records. The data should be also sorted by
timestamp.
3. When the correlation is greater than a predefined threshold (e.g 0.9) I
would like to emit an new message containing {uniqueId,
doubleCorrelationValue}
4. For the correlation I would like to use MLlib
5. As a programming language I would like to muse Java 7.

Can you please give me some suggestions on how to create the skeleton for
the above scenario?

Thanks.
 Regards,
 Florin


Parsing CSV files in Spark

2015-02-06 Thread Spico Florin
Hi!
  I'm new to Spark. I have a case study that where the data is store in CSV
files. These files have headers with morte than 1000 columns. I would like
to know what are the best practice to parsing them and in special the
following points:
1. Getting and parsing all the files from a folder
2. What CSV parser do you use?
3. I would like to select just some columns whose names matches a pattern
and then pass the selected columns values (plus the column names) to the
processing and save the output to a CSV (preserving the selected columns).

If you have any experience with some points above, it will be really
helpful (for me and for the others that will encounter the same cases) if
you can share your thoughts.
Thanks.
  Regards,
 Florin


Errors in the workers machines

2015-02-05 Thread Spico Florin
Hello!
 I received the following errors in the workerLog.log files:

ERROR EndpointWriter: AssociationError [akka.tcp://sparkWorker@stream4:33660]
- [akka.tcp://sparkExecutor@stream4:47929]: Error [Association failed with
[akka.tcp://sparkExecutor@stream4:47929]] [
akka.remote.EndpointAssociationException: Association failed with
[akka.tcp://sparkExecutor@stream4:47929]
Caused by:
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
Connection refused: stream4/x.x.x.x:47929
]
(For security reason  have masked the IP with x.x.x.x). The same errors
occurs for different ports
(42395,39761).
Even though I have these errors the application is finished with success.
I have the following questions:
1. For what reasons is using Spark the above ports? What internal component
is triggering them?
2. How I can get rid of these errors?
3. Why the application is still finished with success?
4. Why is trying with more ports?

I look forward for your answers.
  Regards.
 Florin


Reading from CSV file with spark-csv_2.10

2015-02-05 Thread Spico Florin
Hello!
I'm using spark-csv 2.10 with Java from the maven repository
groupIdcom.databricks/groupId
artifactIdspark-csv_2.10/artifactId
version0.1.1/version

I would like to use Spark-SQL to filter out my data. I'm using the
following code:
JavaSchemaRDD cars = new JavaCsvParser().withUseHeader(true).csvFile(
sqlContext, logFile);
cars.registerAsTable(mytable);
 JavaSchemaRDD doll = sqlContext.sql(SELECT TimeStamp FROM mytable);
doll.saveAsTextFile(dolly.csv);

but I'm getting the following error:
Exception in thread main java.lang.RuntimeException: [1.8] failure:
``UNION'' expected but `TimeStamp' fo

SELECT TimeStamp FROM mytablel
at scala.sys.package$.error(package.scala:27)
at
org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:33)

Can you please tell me what is the best approach to filter the CSV data
with SQL?
Thank you.
 Regards,
 Florin