Re: Dataframe fails for large resultsize

2016-04-29 Thread Buntu Dev
Thanks Krishna, but I believe the memory consumed on the executors is being
exhausted in my case. I've allocated the max 10g that I can to both the
driver and executors. Are there any alternatives solutions to fetching the
top 1M rows after ordering the dataset?

Thanks!

On Fri, Apr 29, 2016 at 6:01 PM, Krishna  wrote:

> I recently encountered similar network related errors and was able to fix
> it by applying the ethtool updates decribed here [
> https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-5085]
>
>
> On Friday, April 29, 2016, Buntu Dev  wrote:
>
>> Just to provide more details, I have 200 blocks (parquet files) with avg
>> block size of 70M. When limiting the result set to 100k ("select * from tbl
>> order by c1 limit 10") works but when increasing it to say 1M I keep
>> running into this error:
>>  Connection reset by peer: socket write error
>>
>> I would ultimately want to store the result set as parquet. Are there any
>> other options to handle this?
>>
>> Thanks!
>>
>> On Wed, Apr 27, 2016 at 11:10 AM, Buntu Dev  wrote:
>>
>>> I got 14GB of parquet data and when trying to apply order by using spark
>>> sql and save the first 1M rows but keeps failing with "Connection reset
>>> by peer: socket write error" on the executors.
>>>
>>> I've allocated about 10g to both driver and the executors along with
>>> setting the maxResultSize to 10g but still fails with the same error.
>>> I'm using Spark 1.5.1.
>>>
>>> Are there any other alternative ways to handle this?
>>>
>>> Thanks!
>>>
>>
>>


Re: Dataframe fails for large resultsize

2016-04-29 Thread Krishna
I recently encountered similar network related errors and was able to fix
it by applying the ethtool updates decribed here [
https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-5085]

On Friday, April 29, 2016, Buntu Dev  wrote:

> Just to provide more details, I have 200 blocks (parquet files) with avg
> block size of 70M. When limiting the result set to 100k ("select * from tbl
> order by c1 limit 10") works but when increasing it to say 1M I keep
> running into this error:
>  Connection reset by peer: socket write error
>
> I would ultimately want to store the result set as parquet. Are there any
> other options to handle this?
>
> Thanks!
>
> On Wed, Apr 27, 2016 at 11:10 AM, Buntu Dev  > wrote:
>
>> I got 14GB of parquet data and when trying to apply order by using spark
>> sql and save the first 1M rows but keeps failing with "Connection reset
>> by peer: socket write error" on the executors.
>>
>> I've allocated about 10g to both driver and the executors along with
>> setting the maxResultSize to 10g but still fails with the same error.
>> I'm using Spark 1.5.1.
>>
>> Are there any other alternative ways to handle this?
>>
>> Thanks!
>>
>
>


Re: Timeout when submitting an application to a remote Spark Standalone master

2016-04-29 Thread Richard Han
So connecting to the cluster (port 7077) works. That is to say, the Spark
Context is created. The timeout occurs on the worker side when I run any
command with .collect(). The client (local machine) basically waits
forever). I'm wondering if maybe I'm not understanding the architecture
correctly and that the worker machines are attempting to RPC/connect to my
local machine (the client) instead of the master. In which case yes it
wouldn't work because my local machine is behind a router and is a
non-valid 192 address.

On Fri, Apr 29, 2016 at 3:46 PM, Femi Anthony  wrote:

> Have you tried connecting to the port 7077 on the cluster from your local
> machine to see if it works ok ?
>
> Sent from my iPhone
>
> On Apr 29, 2016, at 5:58 PM, Richard Han  wrote:
>
> I have an EC2 installation of Spark Standalone Master/Worker set up. The
> two can talk to one another, and all ports are open in the security group
> (just to make sure it isn't the port). When I run spark-shell on the master
> node (setting it to --master spark://ip:7077) it runs everything correctly.
> When I try to submit a job from my local machine however I get RPC timeout
> errors. Does anyone know why this is or how to resolve it?
>
> (cross posted at
> http://stackoverflow.com/questions/36947811/application-submitted-to-remote-spark-from-local-pyspark-never-completes
> )
>
> Thanks!
>
>


Re: Timeout when submitting an application to a remote Spark Standalone master

2016-04-29 Thread Femi Anthony
Have you tried connecting to the port 7077 on the cluster from your local 
machine to see if it works ok ?

Sent from my iPhone

> On Apr 29, 2016, at 5:58 PM, Richard Han  wrote:
> 
> I have an EC2 installation of Spark Standalone Master/Worker set up. The two 
> can talk to one another, and all ports are open in the security group (just 
> to make sure it isn't the port). When I run spark-shell on the master node 
> (setting it to --master spark://ip:7077) it runs everything correctly. When I 
> try to submit a job from my local machine however I get RPC timeout errors. 
> Does anyone know why this is or how to resolve it?
> 
> (cross posted at 
> http://stackoverflow.com/questions/36947811/application-submitted-to-remote-spark-from-local-pyspark-never-completes)
> 
> Thanks!


Timeout when submitting an application to a remote Spark Standalone master

2016-04-29 Thread Richard Han
I have an EC2 installation of Spark Standalone Master/Worker set up. The
two can talk to one another, and all ports are open in the security group
(just to make sure it isn't the port). When I run spark-shell on the master
node (setting it to --master spark://ip:7077) it runs everything correctly.
When I try to submit a job from my local machine however I get RPC timeout
errors. Does anyone know why this is or how to resolve it?

(cross posted at
http://stackoverflow.com/questions/36947811/application-submitted-to-remote-spark-from-local-pyspark-never-completes
)

Thanks!


DropDuplicates Behavior

2016-04-29 Thread Allen George
I'd like to echo a question that was asked earlier this year:

If we do a global sort of a dataframe (with two columns: col_1, col_2) by
(col_1, col_2/desc) and then dropDuplicates on col_1, will it retain the
first row of each sorted group? i.e. Will it return the row with the
greatest value of col_2 for each col_1 group?

Thanks,
Allen


Re: Dataframe fails for large resultsize

2016-04-29 Thread Buntu Dev
Just to provide more details, I have 200 blocks (parquet files) with avg
block size of 70M. When limiting the result set to 100k ("select * from tbl
order by c1 limit 10") works but when increasing it to say 1M I keep
running into this error:
 Connection reset by peer: socket write error

I would ultimately want to store the result set as parquet. Are there any
other options to handle this?

Thanks!

On Wed, Apr 27, 2016 at 11:10 AM, Buntu Dev  wrote:

> I got 14GB of parquet data and when trying to apply order by using spark
> sql and save the first 1M rows but keeps failing with "Connection reset
> by peer: socket write error" on the executors.
>
> I've allocated about 10g to both driver and the executors along with
> setting the maxResultSize to 10g but still fails with the same error. I'm
> using Spark 1.5.1.
>
> Are there any other alternative ways to handle this?
>
> Thanks!
>


how to orderBy previous groupBy.count.orderBy

2016-04-29 Thread Brent S. Elmer Ph.D.
I have the following simple example that I can't get to work correctly.

In [1]:

from pyspark.sql import SQLContext, Row
from pyspark.sql.types import StructType, StructField, IntegerType,
StringType
from pyspark.sql.functions import asc, desc, sum, count
sqlContext = SQLContext(sc)

error_schema = StructType([
StructField('id', IntegerType(), nullable=False),
StructField('error_code', IntegerType(),
nullable=False),
StructField('error_desc', StringType(),
nullable=False)
])
error_data = sc.parallelize([
Row(1, 1, 'type 1 error'),
Row(1, 2, 'type 2 error'),
Row(2, 4, 'type 4 error'),
Row(2, 3, 'type 3 error'),
Row(2, 3, 'type 3 error'),
Row(2, 2, 'type 2 error'),
Row(2, 1, 'type 1 error'),
Row(3, 2, 'type 2 error'),
Row(3, 2, 'type 2 error'),
Row(3, 2, 'type 2 error'),
Row(3, 1, 'type 1 error'),
Row(3, 3, 'type 3 error'),
Row(3, 1, 'type 1 error'),
Row(3, 1, 'type 1 error'),
Row(3, 4, 'type 4 error'),
Row(3, 5, 'type 5 error'),
Row(3, 1, 'type 1 error'),
Row(3, 1, 'type 1 error'),
Row(3, 2, 'type 2 error'),
Row(3, 4, 'type 4 error'),
Row(3, 1, 'type 1 error'),

])
error_df = sqlContext.createDataFrame(error_data, error_schema)
error_df.show()
id_count =
error_df.groupBy(error_df["id"]).count().orderBy(desc("count"))
id_count.show()
error_df.groupBy(error_df["id"], error_df["error_code"],
error_df["error_desc"]).count().orderBy(id_count["id"],
desc("count")).show(20)

+---+--++
| id|error_code|  error_desc|
+---+--++
|  1| 1|type 1 error|
|  1| 2|type 2 error|
|  2| 4|type 4 error|
|  2| 3|type 3 error|
|  2| 3|type 3 error|
|  2| 2|type 2 error|
|  2| 1|type 1 error|
|  3| 2|type 2 error|
|  3| 2|type 2 error|
|  3| 2|type 2 error|
|  3| 1|type 1 error|
|  3| 3|type 3 error|
|  3| 1|type 1 error|
|  3| 1|type 1 error|
|  3| 4|type 4 error|
|  3| 5|type 5 error|
|  3| 1|type 1 error|
|  3| 1|type 1 error|
|  3| 2|type 2 error|
|  3| 4|type 4 error|
+---+--++
only showing top 20 rows

+---+-+
| id|count|
+---+-+
|  3|   14|
|  2|5|
|  1|2|
+---+-+

+---+--++-+
| id|error_code|  error_desc|count|
+---+--++-+
|  1| 1|type 1 error|1|
|  1| 2|type 2 error|1|
|  2| 3|type 3 error|2|
|  2| 2|type 2 error|1|
|  2| 1|type 1 error|1|
|  2| 4|type 4 error|1|
|  3| 1|type 1 error|6|
|  3| 2|type 2 error|4|
|  3| 4|type 4 error|2|
|  3| 3|type 3 error|1|
|  3| 5|type 5 error|1|
+---+--++-+


In []:

What I would like is to end up with that last table ordered by the ids
that have the largest error count and within each id descending by
count.  I would like the end result to be like this.

+---+--++-+
| id|error_code|  error_desc|count|
+---+--++-+
|  3| 1|type 1 error|6|
|  3| 2|type 2 error|4|
|  3| 4|type 4 error|2|
|  3| 3|type 3 error|1|
|  3| 5|type 5 error|1|
|  2| 3|type 3 error|2|
|  2| 2|type 2 error|1|
|  2| 1|type 1 error|1|
|  2| 4|type 4 error|1|
|  1| 1|type 1 error|1|
|  1| 2|type 2 error|1|
+---+--++-+

Because id 3 has the highest error count, id 2 the next highest, 1 the
least error count.

What is the best way to do this?


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Various Apache Spark's deployment problems

2016-04-29 Thread Robineast
Do you need 2 --num-executors ?

Sent from my iPhone

> On 29 Apr 2016, at 20:25, Ashish Sharma [via Apache Spark User List] 
>  wrote:
> 
> Submit Command1: 
> 
> spark-submit --class working.path.to.Main \ 
> --master yarn \ 
> --deploy-mode cluster \ 
> --num-executors 17 \ 
> --executor-cores 8 \ 
> --executor-memory 25g \ 
> --driver-memory 25g \ 
> --num-executors 5 \ 
> application-with-all-dependencies.jar 
> 
> Error Log1: 
> 
> User class threw exception: java.lang.RuntimeException: Unable to 
> instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient 
> 
> Submit Command2: 
> 
> spark-submit --class working.path.to.Main \ 
> --master yarn \ 
> --deploy-mode cluster \ 
> --num-executors 17 \ 
> --executor-cores 8 \ 
> --executor-memory 25g \ 
> --driver-memory 25g \ 
> --num-executors 5 \ 
> --files /etc/hive/conf/hive-site.xml \ 
> application-with-all-dependencies.jar 
> 
> Error Log2: 
> 
> User class threw exception: java.lang.NumberFormatException: For 
> input string: "5s" 
> 
> Since I don't have the administrative permissions, I cannot modify the 
> configuration. Well, I can contact to the IT engineer and make the changes, 
> but I'm looking for the 
> solution that involves less changes in the configuration files, if possible! 
> 
> Configuration changes were suggested in here: 
> https://hadoopist.wordpress.com/2016/02/23/how-to-resolve-error-yarn-applicationmaster-user-class-threw-exception-java-lang-runtimeexception-java-lang-numberformatexception-for-input-string-5s-in-spark-submit/
> 
> Then I tried passing various jar files as arguments as suggested in other 
> discussion forums. 
> 
> Submit Command3: 
> 
> spark-submit --class working.path.to.Main \ 
> --master yarn \ 
> --deploy-mode cluster \ 
> --num-executors 17 \ 
> --executor-cores 8 \ 
> --executor-memory 25g \ 
> --driver-memory 25g \ 
> --num-executors 5 \ 
> --jars 
> /usr/hdp/2.3.0.0-2557/spark/lib/datanucleus-api-jdo-3.2.6.jar,/usr/hdp/2.3.0.0-2557/spark/lib/datanucleus-core-3.2.10.jar,/usr/hdp/2.3.0.0-2557/spark/lib/datanucleus-rdbms-3.2.9.jar
>  \ 
> --files /etc/hive/conf/hive-site.xml \ 
> application-with-all-dependencies.jar 
> 
> Error Log3: 
> 
> User class threw exception: java.lang.NumberFormatException: For 
> input string: "5s" 
> 
> I didn't understood what happened with the following command and couldn't 
> analyze the error log. 
> 
> Submit Command4: 
> 
> spark-submit --class working.path.to.Main \ 
> --master yarn \ 
> --deploy-mode cluster \ 
> --num-executors 17 \ 
> --executor-cores 8 \ 
> --executor-memory 25g \ 
> --driver-memory 25g \ 
> --num-executors 5 \ 
> --jars /usr/hdp/2.3.0.0-2557/spark/lib/*.jar \ 
> --files /etc/hive/conf/hive-site.xml \ 
> application-with-all-dependencies.jar 
> 
> Submit Log4: 
> 
> Application application_1461686223085_0014 failed 2 times due to AM 
> Container for appattempt_1461686223085_0014_02 exited with exitCode: 10 
> For more detailed output, check application tracking page: href="http://cluster-host:/cluster/app/application_1461686223085_0014Then;>http://cluster-host:/cluster/app/application_1461686223085_0014Then,
>  click on links to logs of each attempt. 
> Diagnostics: Exception from container-launch. 
> Container id: container_e10_1461686223085_0014_02_01 
> Exit code: 10 
> Stack trace: ExitCodeException exitCode=10: 
> at org.apache.hadoop.util.Shell.runCommand(Shell.java:545) 
> at org.apache.hadoop.util.Shell.run(Shell.java:456) 
> at 
> org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:722) 
> at 
> org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:211)
>  
> at 
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
>  
> at 
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
>  
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  
> at 
> 

Re: What is the default value of rebalance.backoff.ms in Spark Kafka Direct?

2016-04-29 Thread swetha kasireddy
OK. Thanks Cody!

On Fri, Apr 29, 2016 at 12:41 PM, Cody Koeninger  wrote:

> If worker to broker communication breaks down, the worker will sleep
> for refresh.leader.backoff.ms before throwing an error, at which point
> normal spark task retry (spark.task.maxFailures) comes into play.
>
> If driver to broker communication breaks down, the driver will sleep
> for refresh.leader.backoff.ms before retrying the attempt to get
> offsets, up to spark.streaming.kafka.maxRetries number of times.
>
> The actual leader rebalancing process is entirely up to Kafka, which
> is why I'm saying if you're losing leaders, you should look at Kafka.
>
> On Fri, Apr 29, 2016 at 11:21 AM, swetha kasireddy
>  wrote:
> > OK. So the Kafka side they use rebalance.backoff.ms of 2000 which is a
> > default for rebalancing and they say that refresh.leader.backoff.ms of
> 200
> > to refresh leader is very aggressive and suggested us to increase it to
> > 2000. Even after increasing to 2500 I still get Leader Lost Errors.
> >
> > Is  refresh.leader.backoff.ms the right setting in the app for it to
> wait
> > till the leader election and rebalance is done from the Kafka side
> assuming
> > that Kafka has  rebalance.backoff.ms of 2000  ?
> >
> > Also, does Spark Kafka Direct try to restart the app when the leader is
> lost
> > or it will just wait till  refresh.leader.backoff.ms   and then retry
> again
> > depending on the number of retries?
> >
> > On Fri, Apr 29, 2016 at 8:14 AM, swetha kasireddy
> >  wrote:
> >>
> >> OK. So the Kafka side they use rebalance.backoff.ms of 2000 which is a
> >> default for rebalancing and they say that refresh.leader.backoff.ms of
> 200
> >> to refresh leader is very aggressive and suggested us to increase it to
> >> 2000. Even after increasing to 2500 I still get Leader Lost Errors.
> >>
> >> Is  refresh.leader.backoff.ms the right setting in the app for it to
> wait
> >> till the leader election and rebalance is done from the Kafka side
> assuming
> >> that Kafka has  rebalance.backoff.ms of 2000  ?
> >>
> >> On Wed, Apr 27, 2016 at 11:05 AM, Cody Koeninger 
> >> wrote:
> >>>
> >>> Seems like it'd be better to look into the Kafka side of things to
> >>> determine why you're losing leaders frequently, as opposed to trying
> >>> to put a bandaid on it.
> >>>
> >>> On Wed, Apr 27, 2016 at 11:49 AM, SRK 
> wrote:
> >>> > Hi,
> >>> >
> >>> > We seem to be getting a lot of LeaderLostExceptions and our source
> >>> > Stream is
> >>> > working with a default value of rebalance.backoff.ms which is 2000.
> I
> >>> > was
> >>> > thinking to increase this value to 5000. Any suggestions on  this?
> >>> >
> >>> > Thanks!
> >>> >
> >>> >
> >>> >
> >>> > --
> >>> > View this message in context:
> >>> >
> http://apache-spark-user-list.1001560.n3.nabble.com/What-is-the-default-value-of-rebalance-backoff-ms-in-Spark-Kafka-Direct-tp26840.html
> >>> > Sent from the Apache Spark User List mailing list archive at
> >>> > Nabble.com.
> >>> >
> >>> > -
> >>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >>> > For additional commands, e-mail: user-h...@spark.apache.org
> >>> >
> >>
> >>
> >
>


Re: What is the default value of rebalance.backoff.ms in Spark Kafka Direct?

2016-04-29 Thread Cody Koeninger
If worker to broker communication breaks down, the worker will sleep
for refresh.leader.backoff.ms before throwing an error, at which point
normal spark task retry (spark.task.maxFailures) comes into play.

If driver to broker communication breaks down, the driver will sleep
for refresh.leader.backoff.ms before retrying the attempt to get
offsets, up to spark.streaming.kafka.maxRetries number of times.

The actual leader rebalancing process is entirely up to Kafka, which
is why I'm saying if you're losing leaders, you should look at Kafka.

On Fri, Apr 29, 2016 at 11:21 AM, swetha kasireddy
 wrote:
> OK. So the Kafka side they use rebalance.backoff.ms of 2000 which is a
> default for rebalancing and they say that refresh.leader.backoff.ms of 200
> to refresh leader is very aggressive and suggested us to increase it to
> 2000. Even after increasing to 2500 I still get Leader Lost Errors.
>
> Is  refresh.leader.backoff.ms the right setting in the app for it to wait
> till the leader election and rebalance is done from the Kafka side assuming
> that Kafka has  rebalance.backoff.ms of 2000  ?
>
> Also, does Spark Kafka Direct try to restart the app when the leader is lost
> or it will just wait till  refresh.leader.backoff.ms   and then retry again
> depending on the number of retries?
>
> On Fri, Apr 29, 2016 at 8:14 AM, swetha kasireddy
>  wrote:
>>
>> OK. So the Kafka side they use rebalance.backoff.ms of 2000 which is a
>> default for rebalancing and they say that refresh.leader.backoff.ms of 200
>> to refresh leader is very aggressive and suggested us to increase it to
>> 2000. Even after increasing to 2500 I still get Leader Lost Errors.
>>
>> Is  refresh.leader.backoff.ms the right setting in the app for it to wait
>> till the leader election and rebalance is done from the Kafka side assuming
>> that Kafka has  rebalance.backoff.ms of 2000  ?
>>
>> On Wed, Apr 27, 2016 at 11:05 AM, Cody Koeninger 
>> wrote:
>>>
>>> Seems like it'd be better to look into the Kafka side of things to
>>> determine why you're losing leaders frequently, as opposed to trying
>>> to put a bandaid on it.
>>>
>>> On Wed, Apr 27, 2016 at 11:49 AM, SRK  wrote:
>>> > Hi,
>>> >
>>> > We seem to be getting a lot of LeaderLostExceptions and our source
>>> > Stream is
>>> > working with a default value of rebalance.backoff.ms which is 2000. I
>>> > was
>>> > thinking to increase this value to 5000. Any suggestions on  this?
>>> >
>>> > Thanks!
>>> >
>>> >
>>> >
>>> > --
>>> > View this message in context:
>>> > http://apache-spark-user-list.1001560.n3.nabble.com/What-is-the-default-value-of-rebalance-backoff-ms-in-Spark-Kafka-Direct-tp26840.html
>>> > Sent from the Apache Spark User List mailing list archive at
>>> > Nabble.com.
>>> >
>>> > -
>>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> > For additional commands, e-mail: user-h...@spark.apache.org
>>> >
>>
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Sanely updating parquet partitions.

2016-04-29 Thread Philip Weaver
Hello, I think I may have jumped to the wrong conclusion about symlinks,
and I was able to get what I want working perfectly.

I added these two settings in my importer application:

sc.hadoopConfiguration.set("mapreduce.fileoutputcommitter.marksuccessfuljobs",
"false")

sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")


Then when I read the parquet table, I set the "basePath" option to the
parent of each of the partitions, e.g.:

val df = sqlContext.read.options(Map("basePath" ->
"/path/to/table")).parquet("/path/to/table/a=*")


I also checked that the symlinks were followed the way I wanted, by
removing one of the symlinks after creating the DataFrame, and I was able
to query the DataFrame without error.

- Philip


On Fri, Apr 29, 2016 at 9:56 AM, Philip Weaver 
wrote:

> Hello,
>
> I have a parquet dataset, partitioned by a column 'a'. I want to take
> advantage
> of Spark SQL's ability to filter to the partition when you filter on 'a'.
> I also
> want to periodically update individual partitions without disrupting any
> jobs
> that are querying the data.
>
> The obvious solution was to write parquet datasets to a separate directory
> and
> then update a symlink to point to it. Readers resolve the symlink to
> construct
> the DataFrame, so that when an update occurs any jobs continue to read the
> version of the data that they started with. Old data is cleaned up after
> no jobs
> are using it.
>
> This strategy works fine when updating an entire top-level parquet
> database. However, it seems like Spark SQL (or parquet) cannot handle
> partition
> directories being symlinks (and even if it could, it probably wouldn't
> resolve
> those symlinks so that it doesn't blow up when the symlink changes at
> runtime). For example, if you create symlinks a=1, a=2 and a=3 in a
> directory
> and then try to load that directory in Spark SQL, you get the "Conflicting
> partition column names detected".
>
> So my question is, can anyone think of another solution that meets my
> requirements (i.e. to take advantage of paritioning and perform safe
> updates of
> existing partitions)?
>
> Thanks!
>
> - Philip
>
>
>


Re: aggregateByKey - external combine function

2016-04-29 Thread Nirav Patel
Any thoughts?

I can explain more on problem but basically shuffle data doesn't seem to
fit in reducer memory (32GB) and I am looking ways to process them on
disk+memory.

Thanks

On Thu, Apr 28, 2016 at 10:07 AM, Nirav Patel  wrote:

> Hi,
>
> I tried to convert a groupByKey operation to aggregateByKey in a hope to
> avoid memory and high gc issue when dealing with 200GB of data.
> I needed to create a Collection of resulting key-value pairs which
> represent all combinations of given key.
>
> My merge fun definition is as follows:
>
> private def processDataMerge(map1: collection.mutable.Map[String,
> UserDataSet],
>   map2:
> collection.mutable.Map[String, UserDataSet])
> : collection.mutable.Map[String, UserDataSet] = {
>
> //psuedo code
>
> map1 + map2
> (Set[combEle1], Set[combEle2] ... ) = map1.map(...extract all elements
> here)
> comb1 = cominatorics(Set[CombELe1])
> ..
> totalcombinations = comb1 + comb2 + ..
>
> map1 + totalcombinations.map(comb => (comb -> UserDataSet))
>
> }
>
>
> Output of one merge(or seq) is basically combinations of input collection
> elements and so and so on. So finally you get all combinations for given
> key.
>
> Its performing worst using aggregateByKey then groupByKey with same
> configuration. GroupByKey used to halt at last 9 partitions out of 4000.
> This one is halting even earlier. (halting due to high GC). I kill the job
> after it halts for hours on same task.
>
> I give 25GB executor memory and 4GB overhead. My cluster can't allocate
> more than 32GB per executor.
>
> I thought of custom partitioning my keys so there's less data per key and
> hence less combination. that will help with data skew but wouldn't in the
> end it would come to same thing? Like at some point it will need to merge
> key-values spread across different salt and it will come to memory issue at
> that point!
>
> Any pointer to resolve this? perhaps an external merge ?
>
> Thanks
> Nirav
>
>
>
> Thanks
>
>
>
>
>

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



Sanely updating parquet partitions.

2016-04-29 Thread Philip Weaver
Hello,

I have a parquet dataset, partitioned by a column 'a'. I want to take
advantage
of Spark SQL's ability to filter to the partition when you filter on 'a'. I
also
want to periodically update individual partitions without disrupting any
jobs
that are querying the data.

The obvious solution was to write parquet datasets to a separate directory
and
then update a symlink to point to it. Readers resolve the symlink to
construct
the DataFrame, so that when an update occurs any jobs continue to read the
version of the data that they started with. Old data is cleaned up after no
jobs
are using it.

This strategy works fine when updating an entire top-level parquet
database. However, it seems like Spark SQL (or parquet) cannot handle
partition
directories being symlinks (and even if it could, it probably wouldn't
resolve
those symlinks so that it doesn't blow up when the symlink changes at
runtime). For example, if you create symlinks a=1, a=2 and a=3 in a
directory
and then try to load that directory in Spark SQL, you get the "Conflicting
partition column names detected".

So my question is, can anyone think of another solution that meets my
requirements (i.e. to take advantage of paritioning and perform safe
updates of
existing partitions)?

Thanks!

- Philip


Re: (YARN CLUSTER MODE) Where to find logs within Spark RDD processing function ?

2016-04-29 Thread nguyen duc tuan
what does the WebUI show? What do you see when you click on "stderr" and
"stdout" links ? These links must contain stdoutput and stderr for each
executor.
About your custom logging in executor, are you sure you checked "${spark.
yarn.app.container.log.dir}/spark-app.log"
Actual location of this file each executor is ${
yarn.nodemanager.remote-app-log-dir}/{applicationId}/${spark.
yarn.app.container.log.dir}/spark-app.log (yarn.nodemanager.remote-app-log-dir
setting can found in yarn-site.xml in hadoop config folder)
For example, in above example, when I click to "stdout" link with respect
to hslave-13, I get link "
http://hslave-13:8042/node/containerlogs/container_1459219311185_2456_01_04/tuannd/stdout?start=-4096;,
this means the location of file is in hslave-13: ${
yarn.nodemanager.remote-app-log-dir}/appId/
container_1459219311185_2456_01_04/spark-app.log

I also see that you forgot to send file "log4j.properties" to executors in
spark-submit command. Executors will try to find log4j.properties in its
execution's folder. In this case, this file is not found, the setting for
logging will be ignored.
You have to add parameters --files /path/to/your/log4j.properties in order
to send this file to executors.
​​

Finally, In order to debug what is happening in executors, you should write
it directly to stdout or stderr. It's much easier to check than go directly
to executor and find your log file :)

2016-04-29 21:30 GMT+07:00 dev loper :

> Hi Ted & Nguyen,
>
> @Ted , I was under the belief that if the log4j.properties file would be
> taken from the application classpath if  file path is not specified.
> Please correct me if I am wrong. I tried your approach as well still I
> couldn't find the logs.
>
> @nguyen I am running it on a Yarn cluster , so Spark UI is redirecting me
> to Yarn UI. I couldn't see the logs there as well. I checked the logs on
> both Master and worker. I am running a cluster with one master and one
> worker.  Even I tired yarn logs there also its not turning up. Does yarn
> logs  include executor logs as well ?
>
>
> Request your help to identify the issue .
>
> On Fri, Apr 29, 2016 at 7:32 PM, Ted Yu  wrote:
>
>> Please use the following syntax:
>>
>> --conf
>>  
>> "spark.executor.extraJavaOptions=-Dlog4j.configuration=file:///local/file/log4j.properties"
>>
>> FYI
>>
>> On Fri, Apr 29, 2016 at 6:03 AM, dev loper  wrote:
>>
>>> Hi Spark Team,
>>>
>>> I have asked the same question on stack overflow  , no luck yet.
>>>
>>>
>>> http://stackoverflow.com/questions/36923949/where-to-find-logs-within-spark-rdd-processing-function-yarn-cluster-mode?noredirect=1#comment61419406_36923949
>>>
>>> I am running my Spark Application on Yarn Cluster. No matter what I do,
>>> I am not able to get the logs within the RDD function printed . Below you
>>> can find the sample snippet which I have written for the RDD processing
>>> function . I have simplified the code to illustrate the syntax I have used
>>> to write the function. When I am running it locally I am able to see the
>>> logs but not in cluster mode. Neither System.err.println nor the logger
>>> seems to be working. But I could see all my driver logs. I even tried to
>>> log using the Root logger , but it was not working at all within the RDD
>>> processing function .I was desperate to see the log messages so finally I
>>> found a guide to use logger as transient (
>>> https://www.mapr.com/blog/how-log-apache-spark) ,but event that didn't
>>> help
>>>
>>> class SampleFlatMapFunction implements PairFlatMapFunction 
>>> ,String,String>{
>>>
>>> private static final long serialVersionUID = 6565656322667L;
>>> transient Logger  executorLogger = 
>>> LogManager.getLogger("sparkExecutor");
>>>
>>>
>>> private void readObject(java.io.ObjectInputStream in)
>>> throws IOException, ClassNotFoundException {
>>> in.defaultReadObject();
>>> executorLogger = LogManager.getLogger("sparkExecutor");
>>> }
>>> @Override
>>> public Iterable> call(Tuple2 
>>> tuple)throws Exception {
>>>
>>> executorLogger.info(" log testing from  executorLogger ::");
>>> System.err.println(" log testing from  executorLogger system error 
>>> stream ");
>>>
>>>
>>> List> updates = new ArrayList<>();
>>> //process Tuple , expand and add it to list.
>>> return updates;
>>>
>>>  }
>>>  };
>>>
>>> My Log4j Configuration is given below
>>>
>>> log4j.appender.console=org.apache.log4j.ConsoleAppender
>>> log4j.appender.console.target=System.err
>>> log4j.appender.console.layout=org.apache.log4j.PatternLayout
>>> log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} 
>>> %p %c{1}: %m%n
>>>
>>> log4j.appender.stdout=org.apache.log4j.ConsoleAppender
>>> 

Re: Spark support for Complex Event Processing (CEP)

2016-04-29 Thread Michael Segel
If you’re getting the logs, then it really isn’t CEP unless you consider the 
event to be the log from the bus. 
This doesn’t sound like there is a time constraint. 

Your bus schedule is fairly fixed and changes infrequently. 
Your bus stops are relatively fixed points. (Within a couple of meters) 

So then you’re taking bus A who is scheduled to drive route 123 and you want to 
compare their nearest location to the bus stop at time T and see how close it 
is to the scheduled route. 


Or am I missing something? 

-Mike

> On Apr 29, 2016, at 3:54 AM, Esa Heikkinen  
> wrote:
> 
> 
> Hi
> 
> I try to explain my case ..
> 
> Situation is not so simple in my logs and solution. There also many types of 
> logs and there are from many sources.
> They are as csv-format and header line includes names of the columns.
> 
> This is simplified description of input logs.
> 
> LOG A's: bus coordinate logs (every bus has own log):
> - timestamp
> - bus number
> - coordinates
> 
> LOG B: bus login/logout (to/from line) message log:
> - timestamp
> - bus number
> - line number
> 
> LOG C:  log from central computers:
> - timestamp
> - bus number
> - bus stop number
> - estimated arrival time to bus stop
> 
> LOG A are updated every 30 seconds (i have also another system by 1 seconds 
> interval). LOG B are updated when bus starts from terminal bus stop and 
> arrives to final bus stop in a line. LOG C is updated when central computer 
> sends new arrival time estimation to bus stop.
> 
> I also need metadata for logs (and analyzer). For example coordinates for bus 
> stop areas.
> 
> Main purpose of analyzing is to check an accuracy (error) of the estimated 
> arrival time to bus stops.
> 
> Because there are many buses and lines, it is too time-comsuming to check all 
> of them. So i check only specific lines with specific bus stops. There are 
> many buses (logged to lines) coming to one bus stop and i am interested about 
> only certain bus.
> 
> To do that, i have to read log partly not in time order (upstream) by 
> sequence:
> 1. From LOG C is searched bus number
> 2. From LOG A is searched when the bus has leaved from terminal bus stop
> 3. From LOG B is searched when bus has sent a login to the line
> 4. From LOG A is searched when the bus has entered to bus stop
> 5. From LOG C is searched a last estimated arrival time to the bus stop and 
> calculates error between real and estimated value
> 
> In my understanding (almost) all log file analyzers reads all data (lines) in 
> time order from log files. My need is only for specific part of log (lines). 
> To achieve that, my solution is to read logs in an arbitrary order (with 
> given time window).
> 
> I know this solution is not suitable for all cases (for example for very fast 
> analyzing and very big data). This solution is suitable for very complex 
> (targeted) analyzing. It can be too slow and memory-consuming, but well done 
> pre-processing of log data can help a lot.
> 
> ---
> Esa Heikkinen
> 
> 28.4.2016, 14:44, Michael Segel kirjoitti:
>> I don’t.
>> 
>> I believe that there have been a  couple of hack-a-thons like one done in 
>> Chicago a few years back using public transportation data.
>> 
>> The first question is what sort of data do you get from the city? 
>> 
>> I mean it could be as simple as time_stamp, bus_id, route and GPS (x,y).   
>> Or they could provide more information. Like last stop, distance to next 
>> stop, avg current velocity… 
>> 
>> Then there is the frequency of the updates. Every second? Every 3 seconds? 5 
>> or 6 seconds…
>> 
>> This will determine how much work you have to do. 
>> 
>> Maybe they provide the routes of the busses via a different API call since 
>> its relatively static.
>> 
>> This will drive your solution more than the underlying technology. 
>> 
>> Oh and whileI focused on bus, there are also rail and other modes of public 
>> transportation like light rail, trains, etc … 
>> 
>> HTH
>> 
>> -Mike
>> 
>> 
>>> On Apr 28, 2016, at 4:10 AM, Esa Heikkinen >> > wrote:
>>> 
>>> 
>>> Do you know any good examples how to use Spark streaming in tracking public 
>>> transportation systems ?
>>> 
>>> Or Storm or some other tool example ?
>>> 
>>> Regards
>>> Esa Heikkinen
>>> 
>>> 28.4.2016, 3:16, Michael Segel kirjoitti:
 Uhm… 
 I think you need to clarify a couple of things…
 
 First there is this thing called analog signal processing…. Is that 
 continuous enough for you? 
 
 But more to the point, Spark Streaming does micro batching so if you’re 
 processing a continuous stream of tick data, you will have more than 50K 
 of tics per second while there are markets open and trading.  Even at 50K 
 a second, that would mean 1 every .02 ms or 50 ticks a ms. 
 
 And you don’t want to wait until you have a batch to start processing, but 
 you want to process when the 

Re: What is the default value of rebalance.backoff.ms in Spark Kafka Direct?

2016-04-29 Thread swetha kasireddy
OK. So the Kafka side they use rebalance.backoff.ms of 2000 which is a
default for rebalancing and they say that refresh.leader.backoff.ms of 200
to refresh leader is very aggressive and suggested us to increase it to
2000. Even after increasing to 2500 I still get Leader Lost Errors.

Is  refresh.leader.backoff.ms the right setting in the app for it to wait
till the leader election and rebalance is done from the Kafka side assuming
that Kafka has  rebalance.backoff.ms of 2000  ?

Also, does Spark Kafka Direct try to restart the app when the leader is
lost or it will just wait till  refresh.leader.backoff.ms   and then retry
again depending on the number of retries?

On Fri, Apr 29, 2016 at 8:14 AM, swetha kasireddy  wrote:

> OK. So the Kafka side they use rebalance.backoff.ms of 2000 which is a
> default for rebalancing and they say that refresh.leader.backoff.ms of
> 200 to refresh leader is very aggressive and suggested us to increase it to
> 2000. Even after increasing to 2500 I still get Leader Lost Errors.
>
> Is  refresh.leader.backoff.ms the right setting in the app for it to wait
> till the leader election and rebalance is done from the Kafka side assuming
> that Kafka has  rebalance.backoff.ms of 2000  ?
>
> On Wed, Apr 27, 2016 at 11:05 AM, Cody Koeninger 
> wrote:
>
>> Seems like it'd be better to look into the Kafka side of things to
>> determine why you're losing leaders frequently, as opposed to trying
>> to put a bandaid on it.
>>
>> On Wed, Apr 27, 2016 at 11:49 AM, SRK  wrote:
>> > Hi,
>> >
>> > We seem to be getting a lot of LeaderLostExceptions and our source
>> Stream is
>> > working with a default value of rebalance.backoff.ms which is 2000. I
>> was
>> > thinking to increase this value to 5000. Any suggestions on  this?
>> >
>> > Thanks!
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/What-is-the-default-value-of-rebalance-backoff-ms-in-Spark-Kafka-Direct-tp26840.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> >
>> > -
>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> > For additional commands, e-mail: user-h...@spark.apache.org
>> >
>>
>
>


Bit(N) on create Table with MSSQLServer

2016-04-29 Thread Andrés Ivaldi
Hello, Spark is executing a create table sentence (using JDBC) to
MSSQLServer with a mapping column type like ColName Bit(1) for boolean
types, This create table cannot be executed on MSSQLServer.

In class JdbcDialect the mapping for Boolean type is Bit(1), so the
question is, this is a problem of spark or JDBC driver who is not mapping
right?

Anyway it´s possible to override that mapping in Spark?

Regards

-- 
Ing. Ivaldi Andres


Re: What is the default value of rebalance.backoff.ms in Spark Kafka Direct?

2016-04-29 Thread swetha kasireddy
OK. So the Kafka side they use rebalance.backoff.ms of 2000 which is a
default for rebalancing and they say that refresh.leader.backoff.ms of 200
to refresh leader is very aggressive and suggested us to increase it to
2000. Even after increasing to 2500 I still get Leader Lost Errors.

Is  refresh.leader.backoff.ms the right setting in the app for it to wait
till the leader election and rebalance is done from the Kafka side assuming
that Kafka has  rebalance.backoff.ms of 2000  ?

On Wed, Apr 27, 2016 at 11:05 AM, Cody Koeninger  wrote:

> Seems like it'd be better to look into the Kafka side of things to
> determine why you're losing leaders frequently, as opposed to trying
> to put a bandaid on it.
>
> On Wed, Apr 27, 2016 at 11:49 AM, SRK  wrote:
> > Hi,
> >
> > We seem to be getting a lot of LeaderLostExceptions and our source
> Stream is
> > working with a default value of rebalance.backoff.ms which is 2000. I
> was
> > thinking to increase this value to 5000. Any suggestions on  this?
> >
> > Thanks!
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/What-is-the-default-value-of-rebalance-backoff-ms-in-Spark-Kafka-Direct-tp26840.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>


Re: (YARN CLUSTER MODE) Where to find logs within Spark RDD processing function ?

2016-04-29 Thread dev loper
Hi Ted & Nguyen,

@Ted , I was under the belief that if the log4j.properties file would be
taken from the application classpath if  file path is not specified.
Please correct me if I am wrong. I tried your approach as well still I
couldn't find the logs.

@nguyen I am running it on a Yarn cluster , so Spark UI is redirecting me
to Yarn UI. I couldn't see the logs there as well. I checked the logs on
both Master and worker. I am running a cluster with one master and one
worker.  Even I tired yarn logs there also its not turning up. Does yarn
logs  include executor logs as well ?


Request your help to identify the issue .

On Fri, Apr 29, 2016 at 7:32 PM, Ted Yu  wrote:

> Please use the following syntax:
>
> --conf
>  
> "spark.executor.extraJavaOptions=-Dlog4j.configuration=file:///local/file/log4j.properties"
>
> FYI
>
> On Fri, Apr 29, 2016 at 6:03 AM, dev loper  wrote:
>
>> Hi Spark Team,
>>
>> I have asked the same question on stack overflow  , no luck yet.
>>
>>
>> http://stackoverflow.com/questions/36923949/where-to-find-logs-within-spark-rdd-processing-function-yarn-cluster-mode?noredirect=1#comment61419406_36923949
>>
>> I am running my Spark Application on Yarn Cluster. No matter what I do, I
>> am not able to get the logs within the RDD function printed . Below you can
>> find the sample snippet which I have written for the RDD processing
>> function . I have simplified the code to illustrate the syntax I have used
>> to write the function. When I am running it locally I am able to see the
>> logs but not in cluster mode. Neither System.err.println nor the logger
>> seems to be working. But I could see all my driver logs. I even tried to
>> log using the Root logger , but it was not working at all within the RDD
>> processing function .I was desperate to see the log messages so finally I
>> found a guide to use logger as transient (
>> https://www.mapr.com/blog/how-log-apache-spark) ,but event that didn't
>> help
>>
>> class SampleFlatMapFunction implements PairFlatMapFunction 
>> ,String,String>{
>>
>> private static final long serialVersionUID = 6565656322667L;
>> transient Logger  executorLogger = LogManager.getLogger("sparkExecutor");
>>
>>
>> private void readObject(java.io.ObjectInputStream in)
>> throws IOException, ClassNotFoundException {
>> in.defaultReadObject();
>> executorLogger = LogManager.getLogger("sparkExecutor");
>> }
>> @Override
>> public Iterable> call(Tuple2 
>> tuple)throws Exception {
>>
>> executorLogger.info(" log testing from  executorLogger ::");
>> System.err.println(" log testing from  executorLogger system error 
>> stream ");
>>
>>
>> List> updates = new ArrayList<>();
>> //process Tuple , expand and add it to list.
>> return updates;
>>
>>  }
>>  };
>>
>> My Log4j Configuration is given below
>>
>> log4j.appender.console=org.apache.log4j.ConsoleAppender
>> log4j.appender.console.target=System.err
>> log4j.appender.console.layout=org.apache.log4j.PatternLayout
>> log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p 
>> %c{1}: %m%n
>>
>> log4j.appender.stdout=org.apache.log4j.ConsoleAppender
>> log4j.appender.stdout.target=System.out
>> log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
>> log4j.appender.stdout.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p 
>> %c{1}: %m%n
>>
>> log4j.appender.RollingAppender=org.apache.log4j.DailyRollingFileAppender
>> log4j.appender.RollingAppender.File=/var/log/spark/spark.log
>> log4j.appender.RollingAppender.DatePattern='.'-MM-dd
>> log4j.appender.RollingAppender.layout=org.apache.log4j.PatternLayout
>> log4j.appender.RollingAppender.layout.ConversionPattern=[%p] %d %c %M - 
>> %m%n
>>
>> log4j.appender.RollingAppenderU=org.apache.log4j.DailyRollingFileAppender
>> 
>> log4j.appender.RollingAppenderU.File=${spark.yarn.app.container.log.dir}/spark-app.log
>> log4j.appender.RollingAppenderU.DatePattern='.'-MM-dd
>> log4j.appender.RollingAppenderU.layout=org.apache.log4j.PatternLayout
>> log4j.appender.RollingAppenderU.layout.ConversionPattern=[%p] %d %c %M - 
>> %m%n
>>
>>
>> # By default, everything goes to console and file
>> log4j.rootLogger=INFO, RollingAppender, console
>>
>> # My custom logging goes to another file
>> log4j.logger.sparkExecutor=INFO, stdout, RollingAppenderU
>>
>>
>> i have tried yarn logs, Spark UI Logs nowhere I could see the log
>> statements from RDD processing functions . I tried below Approaches but it
>> didn't work
>>
>> yarn logs -applicationId
>>
>> I checked even below HDFS path also
>>
>> /tmp/logs/
>>
>>
>> I am running my spark-submit command by passing below arguments, Even
>> then its not working
>>
>>   --master 

Re: (YARN CLUSTER MODE) Where to find logs within Spark RDD processing function ?

2016-04-29 Thread nguyen duc tuan
These are executor's logs, not the driver logs. To see this log files, you
have to go to executor machines where tasks is running. To see what you
will print to stdout or stderr you can either go to the executor machines
directly (will store in "stdout" and "stderr" files somewhere in the
executor machine) or see through webui

2016-04-29 20:03 GMT+07:00 dev loper :

> Hi Spark Team,
>
> I have asked the same question on stack overflow  , no luck yet.
>
>
> http://stackoverflow.com/questions/36923949/where-to-find-logs-within-spark-rdd-processing-function-yarn-cluster-mode?noredirect=1#comment61419406_36923949
>
> I am running my Spark Application on Yarn Cluster. No matter what I do, I
> am not able to get the logs within the RDD function printed . Below you can
> find the sample snippet which I have written for the RDD processing
> function . I have simplified the code to illustrate the syntax I have used
> to write the function. When I am running it locally I am able to see the
> logs but not in cluster mode. Neither System.err.println nor the logger
> seems to be working. But I could see all my driver logs. I even tried to
> log using the Root logger , but it was not working at all within the RDD
> processing function .I was desperate to see the log messages so finally I
> found a guide to use logger as transient (
> https://www.mapr.com/blog/how-log-apache-spark) ,but event that didn't
> help
>
> class SampleFlatMapFunction implements PairFlatMapFunction 
> ,String,String>{
>
> private static final long serialVersionUID = 6565656322667L;
> transient Logger  executorLogger = LogManager.getLogger("sparkExecutor");
>
>
> private void readObject(java.io.ObjectInputStream in)
> throws IOException, ClassNotFoundException {
> in.defaultReadObject();
> executorLogger = LogManager.getLogger("sparkExecutor");
> }
> @Override
> public Iterable> call(Tuple2 tuple) 
>throws Exception {
>
> executorLogger.info(" log testing from  executorLogger ::");
> System.err.println(" log testing from  executorLogger system error 
> stream ");
>
>
> List> updates = new ArrayList<>();
> //process Tuple , expand and add it to list.
> return updates;
>
>  }
>  };
>
> My Log4j Configuration is given below
>
> log4j.appender.console=org.apache.log4j.ConsoleAppender
> log4j.appender.console.target=System.err
> log4j.appender.console.layout=org.apache.log4j.PatternLayout
> log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p 
> %c{1}: %m%n
>
> log4j.appender.stdout=org.apache.log4j.ConsoleAppender
> log4j.appender.stdout.target=System.out
> log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
> log4j.appender.stdout.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p 
> %c{1}: %m%n
>
> log4j.appender.RollingAppender=org.apache.log4j.DailyRollingFileAppender
> log4j.appender.RollingAppender.File=/var/log/spark/spark.log
> log4j.appender.RollingAppender.DatePattern='.'-MM-dd
> log4j.appender.RollingAppender.layout=org.apache.log4j.PatternLayout
> log4j.appender.RollingAppender.layout.ConversionPattern=[%p] %d %c %M - 
> %m%n
>
> log4j.appender.RollingAppenderU=org.apache.log4j.DailyRollingFileAppender
> 
> log4j.appender.RollingAppenderU.File=${spark.yarn.app.container.log.dir}/spark-app.log
> log4j.appender.RollingAppenderU.DatePattern='.'-MM-dd
> log4j.appender.RollingAppenderU.layout=org.apache.log4j.PatternLayout
> log4j.appender.RollingAppenderU.layout.ConversionPattern=[%p] %d %c %M - 
> %m%n
>
>
> # By default, everything goes to console and file
> log4j.rootLogger=INFO, RollingAppender, console
>
> # My custom logging goes to another file
> log4j.logger.sparkExecutor=INFO, stdout, RollingAppenderU
>
>
> i have tried yarn logs, Spark UI Logs nowhere I could see the log
> statements from RDD processing functions . I tried below Approaches but it
> didn't work
>
> yarn logs -applicationId
>
> I checked even below HDFS path also
>
> /tmp/logs/
>
>
> I am running my spark-submit command by passing below arguments, Even then
> its not working
>
>   --master yarn --deploy-mode cluster   --conf 
> "spark.driver.extraJavaOptions=-Dlog4j.configuration=log4j.properties"  
> --conf 
> "spark.executor.extraJavaOptions=-Dlog4j.configuration=log4j.properties"
>
> Can somebody guide me on logging within spark RDD and map functions ? What
> am I missing in the above steps ?
>
> Thanks
>
> Dev
>


(YARN CLUSTER MODE) Where to find logs within Spark RDD processing function ?

2016-04-29 Thread dev loper
Hi Spark Team,

I have asked the same question on stack overflow  , no luck yet.

http://stackoverflow.com/questions/36923949/where-to-find-logs-within-spark-rdd-processing-function-yarn-cluster-mode?noredirect=1#comment61419406_36923949

I am running my Spark Application on Yarn Cluster. No matter what I do, I
am not able to get the logs within the RDD function printed . Below you can
find the sample snippet which I have written for the RDD processing
function . I have simplified the code to illustrate the syntax I have used
to write the function. When I am running it locally I am able to see the
logs but not in cluster mode. Neither System.err.println nor the logger
seems to be working. But I could see all my driver logs. I even tried to
log using the Root logger , but it was not working at all within the RDD
processing function .I was desperate to see the log messages so finally I
found a guide to use logger as transient (
https://www.mapr.com/blog/how-log-apache-spark) ,but event that didn't help

class SampleFlatMapFunction implements PairFlatMapFunction
,String,String>{

private static final long serialVersionUID = 6565656322667L;
transient Logger  executorLogger = LogManager.getLogger("sparkExecutor");


private void readObject(java.io.ObjectInputStream in)
throws IOException, ClassNotFoundException {
in.defaultReadObject();
executorLogger = LogManager.getLogger("sparkExecutor");
}
@Override
public Iterable> call(Tuple2
tuple)throws Exception {

executorLogger.info(" log testing from  executorLogger ::");
System.err.println(" log testing from  executorLogger system
error stream ");


List> updates = new ArrayList<>();
//process Tuple , expand and add it to list.
return updates;

 }
 };

My Log4j Configuration is given below

log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd
HH:mm:ss} %p %c{1}: %m%n

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yy/MM/dd
HH:mm:ss} %p %c{1}: %m%n

log4j.appender.RollingAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.RollingAppender.File=/var/log/spark/spark.log
log4j.appender.RollingAppender.DatePattern='.'-MM-dd
log4j.appender.RollingAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.RollingAppender.layout.ConversionPattern=[%p] %d %c %M - %m%n

log4j.appender.RollingAppenderU=org.apache.log4j.DailyRollingFileAppender

log4j.appender.RollingAppenderU.File=${spark.yarn.app.container.log.dir}/spark-app.log
log4j.appender.RollingAppenderU.DatePattern='.'-MM-dd
log4j.appender.RollingAppenderU.layout=org.apache.log4j.PatternLayout
log4j.appender.RollingAppenderU.layout.ConversionPattern=[%p] %d
%c %M - %m%n


# By default, everything goes to console and file
log4j.rootLogger=INFO, RollingAppender, console

# My custom logging goes to another file
log4j.logger.sparkExecutor=INFO, stdout, RollingAppenderU


i have tried yarn logs, Spark UI Logs nowhere I could see the log
statements from RDD processing functions . I tried below Approaches but it
didn't work

yarn logs -applicationId

I checked even below HDFS path also

/tmp/logs/


I am running my spark-submit command by passing below arguments, Even then
its not working

  --master yarn --deploy-mode cluster   --conf
"spark.driver.extraJavaOptions=-Dlog4j.configuration=log4j.properties"
 --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=log4j.properties"

Can somebody guide me on logging within spark RDD and map functions ? What
am I missing in the above steps ?

Thanks

Dev


Re: Spark support for Complex Event Processing (CEP)

2016-04-29 Thread Mich Talebzadeh
ok like any work you need to start this from a simple model. take one bus
only (identified by bus number which is unique).

for any bus no N you have two logs LOG A and LOG B and LOG C the
coordinator from Central computer that sends estimated time of arrival
(ETA) to the bus stops. Pretty simple.

What is the difference in timestamp in LOG and LOG B for bus N? Are they
the same.

Your window for a give bus would be (start from station, deterministic,
already known) and End Time (complete round). The end time could be start
from station + 1 hour say or any bigger.

val ssc = new StreamingContext(sparkConf, Seconds(xx))

Then it is pretty simple.. You need to work out

val windowLength = xx
val slidingInterval = yy

For each bus you have two topics (LOG A and LOG B) plus LOG C that you need
to update based on LOG A and LOG B. outcome

Start from this simple heuristic model first

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 29 April 2016 at 09:54, Esa Heikkinen 
wrote:

>
> Hi
>
> I try to explain my case ..
>
> Situation is not so simple in my logs and solution. There also many types
> of logs and there are from many sources.
> They are as csv-format and header line includes names of the columns.
>
> This is simplified description of input logs.
>
> LOG A's: bus coordinate logs (every bus has own log):
> - timestamp
> - bus number
> - coordinates
>
> LOG B: bus login/logout (to/from line) message log:
> - timestamp
> - bus number
> - line number
>
> LOG C:  log from central computers:
> - timestamp
> - bus number
> - bus stop number
> - estimated arrival time to bus stop
>
> LOG A are updated every 30 seconds (i have also another system by 1
> seconds interval). LOG B are updated when bus starts from terminal bus stop
> and arrives to final bus stop in a line. LOG C is updated when central
> computer sends new arrival time estimation to bus stop.
>
> I also need metadata for logs (and analyzer). For example coordinates for
> bus stop areas.
>
> Main purpose of analyzing is to check an accuracy (error) of the estimated
> arrival time to bus stops.
>
> Because there are many buses and lines, it is too time-comsuming to check
> all of them. So i check only specific lines with specific bus stops. There
> are many buses (logged to lines) coming to one bus stop and i am interested
> about only certain bus.
>
> To do that, i have to read log partly not in time order (upstream) by
> sequence:
> 1. From LOG C is searched bus number
> 2. From LOG A is searched when the bus has leaved from terminal bus stop
> 3. From LOG B is searched when bus has sent a login to the line
> 4. From LOG A is searched when the bus has entered to bus stop
> 5. From LOG C is searched a last estimated arrival time to the bus stop
> and calculates error between real and estimated value
>
> In my understanding (almost) all log file analyzers reads all data (lines)
> in time order from log files. My need is only for specific part of log
> (lines). To achieve that, my solution is to read logs in an arbitrary order
> (with given time window).
>
> I know this solution is not suitable for all cases (for example for very
> fast analyzing and very big data). This solution is suitable for very
> complex (targeted) analyzing. It can be too slow and memory-consuming, but
> well done pre-processing of log data can help a lot.
>
> ---
> Esa Heikkinen
>
>
> 28.4.2016, 14:44, Michael Segel kirjoitti:
>
> I don’t.
>
> I believe that there have been a  couple of hack-a-thons like one done in
> Chicago a few years back using public transportation data.
>
> The first question is what sort of data do you get from the city?
>
> I mean it could be as simple as time_stamp, bus_id, route and GPS (x,y).
> Or they could provide more information. Like last stop, distance to next
> stop, avg current velocity…
>
> Then there is the frequency of the updates. Every second? Every 3 seconds?
> 5 or 6 seconds…
>
> This will determine how much work you have to do.
>
> Maybe they provide the routes of the busses via a different API call since
> its relatively static.
>
> This will drive your solution more than the underlying technology.
>
> Oh and whileI focused on bus, there are also rail and other modes of
> public transportation like light rail, trains, etc …
>
> HTH
>
> -Mike
>
>
> On Apr 28, 2016, at 4:10 AM, Esa Heikkinen 
> wrote:
>
>
> Do you know any good examples how to use Spark streaming in tracking
> public transportation systems ?
>
> Or Storm or some other tool example ?
>
> Regards
> Esa Heikkinen
>
> 28.4.2016, 3:16, Michael Segel kirjoitti:
>
> Uhm…
> I think you need to clarify a couple of things…
>
> First there is this thing called analog signal processing…. Is that
> continuous enough for you?

Re: Spark on AWS

2016-04-29 Thread Steve Loughran

On 28 Apr 2016, at 22:59, Alexander Pivovarov 
> wrote:

Spark works well with S3 (read and write). However it's recommended to set 
spark.speculation true (it's expected that some tasks fail if you read large S3 
folder, so speculation should help)


I must disagree.


  1.  Speculative execution has >1 executor running the query, with whoever 
finishes first winning.
  2.  however, "finishes first" is implemented in the output committer, by 
renaming the attempt's output directory to the final output directory: whoever 
renames first wins.
  3.  This relies on rename() being implemented in the filesystem client as an 
atomic transaction.
  4.  Unfortunately, S3 doesn't do renames. Instead every file gets copied to 
one of the new name, then the old file deleted; an operation that takes time 
O(data * files)

if you have more than one executor trying to commit the work simultaneously, 
your output will be mess of both executions, without anything detecting and 
reporting it.

Where did you find this recommendation to set speculation=true?

-Steve

see also: https://issues.apache.org/jira/browse/SPARK-10063


Re: Spark support for Complex Event Processing (CEP)

2016-04-29 Thread Esa Heikkinen


Hi

I try to explain my case ..

Situation is not so simple in my logs and solution. There also many 
types of logs and there are from many sources.

They are as csv-format and header line includes names of the columns.

This is simplified description of input logs.

LOG A's: bus coordinate logs (every bus has own log):
- timestamp
- bus number
- coordinates

LOG B: bus login/logout (to/from line) message log:
- timestamp
- bus number
- line number

LOG C:  log from central computers:
- timestamp
- bus number
- bus stop number
- estimated arrival time to bus stop

LOG A are updated every 30 seconds (i have also another system by 1 
seconds interval). LOG B are updated when bus starts from terminal bus 
stop and arrives to final bus stop in a line. LOG C is updated when 
central computer sends new arrival time estimation to bus stop.


I also need metadata for logs (and analyzer). For example coordinates 
for bus stop areas.


Main purpose of analyzing is to check an accuracy (error) of the 
estimated arrival time to bus stops.


Because there are many buses and lines, it is too time-comsuming to 
check all of them. So i check only specific lines with specific bus 
stops. There are many buses (logged to lines) coming to one bus stop and 
i am interested about only certain bus.


To do that, i have to read log partly not in time order (upstream) by 
sequence:

1. From LOG C is searched bus number
2. From LOG A is searched when the bus has leaved from terminal bus stop
3. From LOG B is searched when bus has sent a login to the line
4. From LOG A is searched when the bus has entered to bus stop
5. From LOG C is searched a last estimated arrival time to the bus stop 
and calculates error between real and estimated value


In my understanding (almost) all log file analyzers reads all data 
(lines) in time order from log files. My need is only for specific part 
of log (lines). To achieve that, my solution is to read logs in an 
arbitrary order (with given time window).


I know this solution is not suitable for all cases (for example for very 
fast analyzing and very big data). This solution is suitable for very 
complex (targeted) analyzing. It can be too slow and memory-consuming, 
but well done pre-processing of log data can help a lot.


---
Esa Heikkinen

28.4.2016, 14:44, Michael Segel kirjoitti:

I don’t.

I believe that there have been a  couple of hack-a-thons like one done 
in Chicago a few years back using public transportation data.


The first question is what sort of data do you get from the city?

I mean it could be as simple as time_stamp, bus_id, route and GPS 
(x,y).   Or they could provide more information. Like last stop, 
distance to next stop, avg current velocity…


Then there is the frequency of the updates. Every second? Every 3 
seconds? 5 or 6 seconds…


This will determine how much work you have to do.

Maybe they provide the routes of the busses via a different API call 
since its relatively static.


This will drive your solution more than the underlying technology.

Oh and whileI focused on bus, there are also rail and other modes of 
public transportation like light rail, trains, etc …


HTH

-Mike


On Apr 28, 2016, at 4:10 AM, Esa Heikkinen 
> 
wrote:



Do you know any good examples how to use Spark streaming in tracking 
public transportation systems ?


Or Storm or some other tool example ?

Regards
Esa Heikkinen

28.4.2016, 3:16, Michael Segel kirjoitti:

Uhm…
I think you need to clarify a couple of things…

First there is this thing called analog signal processing…. Is that 
continuous enough for you?


But more to the point, Spark Streaming does micro batching so if 
you’re processing a continuous stream of tick data, you will have 
more than 50K of tics per second while there are markets open and 
trading.  Even at 50K a second, that would mean 1 every .02 ms or 50 
ticks a ms.


And you don’t want to wait until you have a batch to start 
processing, but you want to process when the data hits the queue and 
pull it from the queue as quickly as possible.


Spark streaming will be able to pull batches in as little as 500ms. 
So if you pull a batch at t0 and immediately have a tick in your 
queue, you won’t process that data until t0+500ms. And said batch 
would contain 25,000 entries.


Depending on what you are doing… that 500ms delay can be enough to 
be fatal to your trading process.


If you don’t like stock data, there are other examples mainly when 
pulling data from real time embedded systems.



If you go back and read what I said, if your data flow is >> (much 
slower) than 500ms, and / or the time to process is >> 500ms ( much 
longer )  you could use spark streaming.  If not… and there are 
applications which require that type of speed…  then you shouldn’t 
use spark streaming.


If you do have that constraint, then you can look at systems like 
storm/flink/samza / whatever where you have a 

Re: Create multiple output files from Thriftserver

2016-04-29 Thread Mich Talebzadeh
Hi,

Two points here.

1) Beeline uses JDBC connection to connect to hive server. You can actually
put your code in an hql file and run it as

beeline -u jdbc:hive2://:10010/default
org.apache.hive.jdbc.HiveDriver -n hduser -p x -f mycode,hql

2) How do you want to split your output file. Is is based on the results of
your query?

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 29 April 2016 at 06:43, mayankshete  wrote:

> Is there a way to create multiple output files when connected from beeline
> to
> the Thriftserver ?
> Right now i am using beeline -e 'query' > output.txt which is not efficient
> as it uses linux operator to combine output files .
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Create-multiple-output-files-from-Thriftserver-tp26845.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: [Spark 1.5.2]All data being written to only one part file rest part files are empty

2016-04-29 Thread Divya Gehlot
Hi ,

I observed if I use subset of same dataset  or data set is small  its
writing to many part files .
If data set grows its writing to only part files rest all part files empty.


Thanks,
Divya

On 25 April 2016 at 23:15, nguyen duc tuan  wrote:

> Maybe the problem is the data itself. For example, the first dataframe
> might has common keys in only one part of the second dataframe. I think you
> can verify if you are in this situation by repartition one dataframe and
> join it. If this is the true reason, you might see the result distributed
> more evenly.
>
> 2016-04-25 9:34 GMT+07:00 Divya Gehlot :
>
>> Hi,
>>
>> After joining two dataframes, saving dataframe using Spark CSV.
>> But all the result data is being written to only one part file whereas
>> there are 200 part files being created, rest 199 part files are empty.
>>
>> What is the cause of uneven partitioning ? How can I evenly distribute
>> the data ?
>> Would really appreciate the help.
>>
>>
>> Thanks,
>> Divya
>>
>
>