Re: Dynamic metric names

2019-05-06 Thread Saisai Shao
I remembered there was a PR about doing similar thing (
https://github.com/apache/spark/pull/18406). From my understanding, this
seems like a quite specific requirement, it may requires code change to
support your needs.

Thanks
Saisai

Sergey Zhemzhitsky  于2019年5月4日周六 下午4:44写道:

> Hello Spark Users!
>
> Just wondering whether it is possible to register a metric source without
> metrics known in advance and add the metrics themselves to this source
> later on?
>
> It seems that currently MetricSystem puts all the metrics from the
> source's MetricRegistry into a shared MetricRegistry of a MetricSystem
> during metric source registration [1].
>
> So in case there is a new metric with a new name added to the source's
> registry after this source registration, then this new metric will not be
> reported to the sinks.
>
> What I'd like to achieve is to be able to register new metrics with new
> names dynamically using a single metric source.
> Is it somehow possible?
>
>
> [1]
> https://github.com/apache/spark/blob/51de86baed0776304c6184f2c04b6303ef48df90/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala#L162
>


[ANNOUNCE] Announcing Apache Spark 2.3.2

2018-09-26 Thread Saisai Shao
We are happy to announce the availability of Spark 2.3.2!

Apache Spark 2.3.2 is a maintenance release, based on the branch-2.3
maintenance branch of Spark. We strongly recommend all 2.3.x users to
upgrade to this stable release.

To download Spark 2.3.2, head over to the download page:
http://spark.apache.org/downloads.html

To view the release notes:
https://spark.apache.org/releases/spark-release-2-3-2.html

We would like to acknowledge all community members for contributing to
this release. This release would not have been possible without you.


Best regards
Saisai


Re: Spark YARN job submission error (code 13)

2018-06-08 Thread Saisai Shao
In Spark on YARN, error code 13 means SparkContext doesn't initialize in
time. You can check the yarn application log to get more information.

BTW, did you just write a plain python script without creating
SparkContext/SparkSession?

Aakash Basu  于2018年6月8日周五 下午4:15写道:

> Hi,
>
> I'm trying to run a program on a cluster using YARN.
>
> YARN is present there along with HADOOP.
>
> Problem I'm running into is as below -
>
> Container exited with a non-zero exit code 13
>> Failing this attempt. Failing the application.
>>  ApplicationMaster host: N/A
>>  ApplicationMaster RPC port: -1
>>  queue: default
>>  start time: 1528297574594
>>  final status: FAILED
>>  tracking URL:
>> http://MasterNode:8088/cluster/app/application_1528296308262_0004
>>  user: bblite
>> Exception in thread "main" org.apache.spark.SparkException: Application
>> application_1528296308262_0004 finished with failed status
>>
>
> I checked on the net and most of the stackoverflow problems say, that the
> users have given *.master('local[*]')* in the code while invoking the
> Spark Session and at the same time, giving *--master yarn* while doing
> the spark-submit, hence they're getting the error due to conflict.
>
> But, in my case, I've not mentioned any master at all at the code. Just
> trying to run it on yarn by giving *--master yarn* while doing the
> spark-submit. Below is the code spark invoking -
>
> spark = SparkSession\
> .builder\
> .appName("Temp_Prog")\
> .getOrCreate()
>
> Below is the spark-submit -
>
> *spark-submit --master yarn --deploy-mode cluster --num-executors 3
> --executor-cores 6 --executor-memory 4G
> /appdata/codebase/backend/feature_extraction/try_yarn.py*
>
> I've tried without --deploy-mode too, still no help.
>
> Thanks,
> Aakash.
>


Re: [Spark Streaming] is spark.streaming.concurrentJobs a per node or a cluster global value ?

2018-06-05 Thread Saisai Shao
"dependent" I mean this batch's job relies on the previous batch's result.
So this batch should wait for the finish of previous batch, if you set "
spark.streaming.concurrentJobs" larger than 1, then the current batch could
start without waiting for the previous batch (if it is delayed), which will
lead to unexpected results.


thomas lavocat  于2018年6月5日周二
下午7:48写道:

>
> On 05/06/2018 13:44, Saisai Shao wrote:
>
> You need to read the code, this is an undocumented configuration.
>
> I'm on it right now, but, Spark is a big piece of software.
>
> Basically this will break the ordering of Streaming jobs, AFAIK it may get
> unexpected results if you streaming jobs are not independent.
>
> What do you mean exactly by not independent ?
> Are several source joined together dependent ?
>
> Thanks,
> Thomas
>
>
> thomas lavocat  于2018年6月5日周二
> 下午7:17写道:
>
>> Hello,
>>
>> Thank's for your answer.
>>
>> On 05/06/2018 11:24, Saisai Shao wrote:
>>
>> spark.streaming.concurrentJobs is a driver side internal configuration,
>> this means that how many streaming jobs can be submitted concurrently in
>> one batch. Usually this should not be configured by user, unless you're
>> familiar with Spark Streaming internals, and know the implication of this
>> configuration.
>>
>>
>> How can I find some documentation about those implications ?
>>
>> I've experimented some configuration of this parameters and found out
>> that my overall throughput is increased in correlation with this property.
>> But I'm experiencing scalability issues. With more than 16 receivers
>> spread over 8 executors, my executors no longer receive work from the
>> driver and fall idle.
>> Is there an explanation ?
>>
>> Thanks,
>> Thomas
>>
>>
>


Re: [Spark Streaming] is spark.streaming.concurrentJobs a per node or a cluster global value ?

2018-06-05 Thread Saisai Shao
You need to read the code, this is an undocumented configuration.

Basically this will break the ordering of Streaming jobs, AFAIK it may get
unexpected results if you streaming jobs are not independent.

thomas lavocat  于2018年6月5日周二
下午7:17写道:

> Hello,
>
> Thank's for your answer.
>
> On 05/06/2018 11:24, Saisai Shao wrote:
>
> spark.streaming.concurrentJobs is a driver side internal configuration,
> this means that how many streaming jobs can be submitted concurrently in
> one batch. Usually this should not be configured by user, unless you're
> familiar with Spark Streaming internals, and know the implication of this
> configuration.
>
>
> How can I find some documentation about those implications ?
>
> I've experimented some configuration of this parameters and found out that
> my overall throughput is increased in correlation with this property.
> But I'm experiencing scalability issues. With more than 16 receivers
> spread over 8 executors, my executors no longer receive work from the
> driver and fall idle.
> Is there an explanation ?
>
> Thanks,
> Thomas
>
>


Re: [Spark Streaming] is spark.streaming.concurrentJobs a per node or a cluster global value ?

2018-06-05 Thread Saisai Shao
spark.streaming.concurrentJobs is a driver side internal configuration,
this means that how many streaming jobs can be submitted concurrently in
one batch. Usually this should not be configured by user, unless you're
familiar with Spark Streaming internals, and know the implication of this
configuration.



thomas lavocat  于2018年6月5日周二
下午4:20写道:

> Hi everyone,
>
> I'm wondering if the property  spark.streaming.concurrentJobs should
> reflects the total number of possible concurrent task on the cluster, or
> the a local number of concurrent tasks on one compute node.
>
> Thanks for your help.
>
> Thomas
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: [Spark Streaming]: Does DStream workload run over Spark SQL engine?

2018-05-02 Thread Saisai Shao
No, the underlying of DStream is RDD, so it will not leverage any SparkSQL
related feature. I think you should use Structured Streaming instead, which
is based on SparkSQL.

Khaled Zaouk  于2018年5月2日周三 下午4:51写道:

> Hi,
>
> I have a question regarding the execution engine of Spark Streaming
> (DStream API): Does Spark streaming jobs run over the Spark SQL engine?
>
> For example, if I change a configuration parameter related to Spark SQL
> (like spark.sql.streaming.minBatchesToRetain or
> spark.sql.objectHashAggregate.sortBased.fallbackThreshold), does this
> make any difference when I run Spark streaming job (using DStream API)?
>
> Thank you!
>
> Khaled
>


Re: How to submit some code segment to existing SparkContext

2018-04-11 Thread Saisai Shao
Maybe you can try Livy (http://livy.incubator.apache.org/).

Thanks
Jerry

2018-04-11 15:46 GMT+08:00 杜斌 :

> Hi,
>
> Is there any way to submit some code segment to the existing SparkContext?
> Just like a web backend, send some user code to the Spark to run, but the
> initial SparkContext takes time, just want to execute some code or Spark
> Sql, and get the result quickly.
>
> Thanks,
> Bin
>


Re: spark application running in yarn client mode is slower than in local mode.

2018-04-09 Thread Saisai Shao
>
> In yarn mode, only two executor are assigned to process the task, since
> one executor can process one task only, they need 6 min in total.
>

This is not true. You should set --executor-cores/--num-executors to
increase the task parallelism for executor. To be fair, Spark application
should have same resources (cpu/memory) when comparing between local and
yarn mode.

2018-04-10 10:05 GMT+08:00 Junfeng Chen :

> I found the potential reason.
>
> In local mode, all tasks in one stage runs concurrently, while tasks in
> yarn mode runs in sequence.
>
> For example, in one stage, each task costs 3 mins. If in local mode, they
> will run together, and cost 3 min in total. In yarn mode, only two executor
> are assigned to process the task, since one executor can process one task
> only, they need 6 min in total.
>
>
> Regard,
> Junfeng Chen
>
> On Mon, Apr 9, 2018 at 2:12 PM, Jörn Franke  wrote:
>
>> Probably network / shuffling cost? Or broadcast variables? Can you
>> provide more details what you do and some timings?
>>
>> > On 9. Apr 2018, at 07:07, Junfeng Chen  wrote:
>> >
>> > I have wrote an spark streaming application reading kafka data and
>> convert the json data to parquet and save to hdfs.
>> > What make me puzzled is, the processing time of app in yarn mode cost
>> 20% to 50% more time than in local mode. My cluster have three nodes with
>> three node managers, and all three hosts have same hardware, 40cores and
>> 256GB memory. .
>> >
>> > Why? How to solve it?
>> >
>> > Regard,
>> > Junfeng Chen
>>
>
>


Re: Spark and Accumulo Delegation tokens

2018-03-23 Thread Saisai Shao
It is in yarn module.
"org.apache.spark.deploy.yarn.security.ServiceCredentialProvider".

2018-03-23 15:10 GMT+08:00 Jorge Machado <jom...@me.com>:

> Hi Jerry,
>
> where do you see that Class on Spark ? I only found 
> HadoopDelegationTokenManager
> and I don’t see any way to add my Provider into it.
>
> private def getDelegationTokenProviders: Map[String, 
> HadoopDelegationTokenProvider] = {
>   val providers = List(new HadoopFSDelegationTokenProvider(fileSystems),
> new HiveDelegationTokenProvider,
> new HBaseDelegationTokenProvider)
>
>   // Filter out providers for which 
> spark.security.credentials.{service}.enabled is false.
>   providers
> .filter { p => isServiceEnabled(p.serviceName) }
> .map { p => (p.serviceName, p) }
> .toMap
> }
>
>
> If you could give me a tipp there would be great.
> Thanks
>
> Jorge Machado
>
>
>
>
>
> On 23 Mar 2018, at 07:38, Saisai Shao <sai.sai.s...@gmail.com> wrote:
>
> I think you can build your own Accumulo credential provider as similar to
> HadoopDelegationTokenProvider out of Spark, Spark already provided an
> interface "ServiceCredentialProvider" for user to plug-in customized
> credential provider.
>
> Thanks
> Jerry
>
> 2018-03-23 14:29 GMT+08:00 Jorge Machado <jom...@me.com>:
>
> Hi Guys,
>
> I’m on the middle of writing a spark Datasource connector for Apache Spark
> to connect to Accumulo Tablets, because we have Kerberos it get’s a little
> trick because Spark only handles the Delegation Tokens from Hbase, hive and
> hdfs.
>
> Would be a PR for a implementation of HadoopDelegationTokenProvider for
> Accumulo be accepted ?
>
>
> Jorge Machado
>
>
>
>
>
>
>
>


Re: Spark and Accumulo Delegation tokens

2018-03-23 Thread Saisai Shao
I think you can build your own Accumulo credential provider as similar to
HadoopDelegationTokenProvider out of Spark, Spark already provided an
interface "ServiceCredentialProvider" for user to plug-in customized
credential provider.

Thanks
Jerry

2018-03-23 14:29 GMT+08:00 Jorge Machado :

> Hi Guys,
>
> I’m on the middle of writing a spark Datasource connector for Apache Spark
> to connect to Accumulo Tablets, because we have Kerberos it get’s a little
> trick because Spark only handles the Delegation Tokens from Hbase, hive and
> hdfs.
>
> Would be a PR for a implementation of HadoopDelegationTokenProvider for
> Accumulo be accepted ?
>
>
> Jorge Machado
>
>
>
>
>
>


Re: Is Apache Spark-2.2.1 compatible with Hadoop-3.0.0

2018-01-07 Thread Saisai Shao
AFAIK, there's no large scale test for Hadoop 3.0 in the community. So it
is not clear whether it is supported or not (or has some issues). I think
in the download page "Pre-Built for Apache Hadoop 2.7 and later" mostly
means that it supports Hadoop 2.7+ (2.8...), but not 3.0 (IIUC).

Thanks
Jerry

2018-01-08 4:50 GMT+08:00 Raj Adyanthaya :

> Hi Akshay
>
> On the Spark Download page when you select Spark 2.2.1 it gives you an
> option to select package type. In that, there is an option to select
> "Pre-Built for Apache Hadoop 2.7 and later". I am assuming it means that it
> does support Hadoop 3.0.
>
> http://spark.apache.org/downloads.html
>
> Thanks,
> Raj A.
>
> On Sat, Jan 6, 2018 at 8:23 PM, akshay naidu 
> wrote:
>
>> hello Users,
>> I need to know whether we can run latest spark on  latest hadoop version
>> i.e., spark-2.2.1 released on 1st dec and hadoop-3.0.0 released on 13th dec.
>> thanks.
>>
>
>


Re: Multiple vcores per container when running Spark applications in Yarn cluster mode

2017-09-10 Thread Saisai Shao
I guess you're using Capacity Scheduler with DefaultResourceCalculator,
which doesn't count cpu cores into resource calculation, this "1" you saw
is actually meaningless. If you want to also calculate cpu resource, you
should choose DominantResourceCalculator.

Thanks
Jerry

On Sat, Sep 9, 2017 at 6:54 AM, Xiaoye Sun  wrote:

> Hi,
>
> I am using Spark 1.6.1 and Yarn 2.7.4.
> I want to submit a Spark application to a Yarn cluster. However, I found
> that the number of vcores assigned to a container/executor is always 1,
> even if I set spark.executor.cores=2. I also found the number of tasks an
> executor runs concurrently is 2. So, it seems that Spark knows that an
> executor/container has two CPU cores but the request is not correctly sent
> to Yarn resource scheduler. I am using the org.apache.hadoop.yarn.
> server.resourcemanager.scheduler.capacity.CapacityScheduler on Yarn.
>
> I am wondering that is it possible to assign multiple vcores to a
> container when a Spark job is submitted to a Yarn cluster in yarn-cluster
> mode.
>
> Thanks!
> Best,
> Xiaoye
>


Re: Port to open for submitting Spark on Yarn application

2017-09-03 Thread Saisai Shao
I think spark.yarn.am.port is not used any more, so you don't need to
consider this.

If you're running Spark on YARN, I think some YARN RM port to submit
applications should also be reachable via firewall, as well as HDFS port to
upload resources.

Also in the Spark side, executors will be connected to driver via
spark.driver.port, maybe you should also set a fixed port number for this
and add to white list of firewall.

Thanks
Jerry


On Mon, Sep 4, 2017 at 8:50 AM, Satoshi Yamada  wrote:

> Hi,
>
> In case we run Spark on Yarn in client mode, we have firewall for Hadoop 
> cluster,
> and the client node is outside firewall, I think I have to open some ports
> that Application Master uses.
>
>
> I think the ports is specified by "spark.yarn.am.port" as document says.
> https://spark.apache.org/docs/latest/running-on-yarn.html
>
> But, according to the source code, spark.yarn.am.port is deprecated since 2.0.
> https://github.com/apache/spark/commit/829cd7b8b70e65a91aa66e6d626bd45f18e0ad97
>
> Does this mean we do not need to open particular ports of firewall for
>
> Spark on Yarn?
>
>
> Thanks,
>
>


Re: Livy with Spark package

2017-08-23 Thread Saisai Shao
You could set "spark.jars.packages" in `conf` field of session post API (
https://github.com/apache/incubator-livy/blob/master/docs/rest-api.md#post-sessions).
This is equal to --package in spark-submit.

BTW you'd better ask livy question in u...@livy.incubator.apache.org.

Thanks
Jerry

On Thu, Aug 24, 2017 at 8:11 AM, ayan guha  wrote:

> Hi
>
> I have a python program which I invoke as
>
>  spark-submit --packages com.databricks:spark-avro_2.11:3.2.0 somefile.py
>  "2017-08-23 02:00:00"  and it works
>
> Now I want to submit this file using Livy. I could work out most of the
> stuff (like putting files to HDFS etc) but not able to understand how/where
> to configure the "packages" switch...Any help?
> --
> Best Regards,
> Ayan Guha
>


Re: Spark Web UI SSL Encryption

2017-08-21 Thread Saisai Shao
Can you please post the specific problem you met?

Thanks
Jerry

On Sat, Aug 19, 2017 at 1:49 AM, Anshuman Kumar 
wrote:

> Hello,
>
> I have recently installed Sparks 2.2.0, and trying to use it for some big
> data processing. Spark is installed on a server that I access from a remote
> computer. I need to setup SSL encryption for the Spark web UI, but
> following some threads online I’m still not able to set it up.
>
> Can someone help me with the SSL encryption.
>
> Warm Regards.
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Kafka 0.10 with PySpark

2017-07-05 Thread Saisai Shao
Please see the reason in this thread (
https://github.com/apache/spark/pull/14340). It would better to use
structured streaming instead.

So I would like to -1 this patch. I think it's been a mistake to support
> dstream in Python -- yes it satisfies a checkbox and Spark could claim
> there's support for streaming in Python. However, the tooling and maturity
> for working with streaming data (both in Spark and the more broad
> ecosystem) is simply not there. It is a big baggage to maintain, and
> creates a the wrong impression that production streaming jobs can be
> written in Python.
>

On Tue, Jul 4, 2017 at 10:53 PM, Daniel van der Ende <
daniel.vandere...@gmail.com> wrote:

> Hi,
>
> I'm working on integrating some pyspark code with Kafka. We'd like to use
> SSL/TLS, and so want to use Kafka 0.10. Because structured streaming is
> still marked alpha, we'd like to use Spark streaming. On this page,
> however, it indicates that the Kafka 0.10 integration in Spark does not
> support Python (https://spark.apache.org/docs/latest/streaming-kafka-
> integration.html). I've been trying to figure out why, but have not been
> able to find anything. Is there any particular reason for this?
>
> Thanks,
>
> Daniel
>


Re: Question about standalone Spark cluster reading from Kerberosed hadoop

2017-06-23 Thread Saisai Shao
Spark running with standalone cluster manager currently doesn't support
accessing security Hadoop. Basically the problem is that standalone mode
Spark doesn't have the facility to distribute delegation tokens.

Currently only Spark on YARN or local mode supports security Hadoop.

Thanks
Jerry

On Fri, Jun 23, 2017 at 5:10 PM, Mu Kong  wrote:

> Hi, all!
>
> I was trying to read from a Kerberosed hadoop cluster from a standalone
> spark cluster.
> Right now, I encountered some authentication issues with Kerberos:
>
>
> java.io.IOException: Failed on local exception: java.io.IOException: 
> org.apache.hadoop.security.AccessControlException: Client cannot authenticate 
> via:[TOKEN, KERBEROS]; Host Details : local host is: ""; 
> destination host is: XXX;
>
>
>
> I checked with klist, and principle/realm is correct.
> I also used hdfs command line to poke HDFS from all the nodes, and it
> worked.
> And if I submit job using local(client) mode, the job worked fine.
>
> I tried to put everything from hadoop/conf to spark/conf and hive/conf to
> spark/conf.
> Also tried edit spark/conf/spark-env.sh to add SPARK_SUBMIT_OPTS/SPARK_
> MASTER_OPTS/SPARK_SLAVE_OPTS/HADOOP_CONF_DIR/HIVE_CONF_DIR, and tried to
> export them in .bashrc as well.
>
> However, I'm still experiencing the same exception.
>
> Then I read some concerning posts about problems with
> kerberosed hadoop, some post like the following one:
> http://blog.stratio.com/spark-kerberos-safe-story/
> , which indicates that we can not access to kerberosed hdfs using
> standalone spark cluster.
>
> I'm using spark 2.1.1, is it still the case that we can't access
> kerberosed hdfs with 2.1.1?
>
> Thanks!
>
>
> Best regards,
> Mu
>
>


Re: Kerberos impersonation of a Spark Context at runtime

2017-05-04 Thread Saisai Shao
Current Spark doesn't support impersonate different users at run-time.
Current Spark's proxy user is application level, which means when setting
through --proxy-user the whole application will be running with that user.

On Thu, May 4, 2017 at 5:13 PM, matd  wrote:

> Hi folks,
>
> I have a Spark application executing various jobs for different users
> simultaneously, via several Spark sessions on several threads.
>
> My customer would like to kerberize his hadoop cluster. I wonder if there
> is
> a way to configure impersonation such as each of these jobs would be ran
> with the different proxy users. From what I see in spark conf and code,
> it's
> not possible to do that at runtime for a specific context, but I'm not
> familiar with Kerberos nor with this part of Spark.
>
> Anyone can confirm/infirm this ?
>
> Mathieu
>
> (also on S.O
> http://stackoverflow.com/questions/43765044/kerberos-
> impersonation-of-a-spark-context-at-runtime)
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Kerberos-impersonation-of-a-
> Spark-Context-at-runtime-tp28651.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Off heap memory settings and Tungsten

2017-04-24 Thread Saisai Shao
AFAIK, I don't think the off-heap memory settings is enabled automatically,
there're two configurations control the tungsten off-heap memory usage:

1. spark.memory.offHeap.enabled.
2. spark.memory.offHeap.size.



On Sat, Apr 22, 2017 at 7:44 PM, geoHeil  wrote:

> Hi,
> I wonder when to enable spark's off heap settings. Shouldn't tungsten
> enable
> these automatically in 2.1?
> http://stackoverflow.com/questions/43330902/spark-off-
> heap-memory-config-and-tungsten
>
> Regards,
> Georg
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Off-heap-memory-settings-and-Tungsten-tp28621.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark API authentication

2017-04-14 Thread Saisai Shao
IIUC auth filter on the Live UI REST API should already be supported, the
fix in SPARK-19652 is mainly for the History UI to support per app based
ACL.

For application submission REST API in standalone mode, I think currently
it is not supported, it is not a bug.

On Fri, Apr 14, 2017 at 6:56 PM, Sergey Grigorev <grigorev-...@yandex.ru>
wrote:

> Thanks for help!
>
> I've found the ticket with a similar problem https://issues.apache.org/
> jira/browse/SPARK-19652. It looks like this fix did not hit to 2.1.0
> release.
> You said that for the second example custom filter is not supported. It
> is a bug or expected behavior?
>
> On 14.04.2017 13:22, Saisai Shao wrote:
>
> AFAIK, For the first line, custom filter should be worked. But for the
> latter it is not supported.
>
> On Fri, Apr 14, 2017 at 6:17 PM, Sergey Grigorev <grigorev-...@yandex.ru>
> wrote:
>
>> GET requests like *
>> <http://worker:4040/api/v1/applications>http://worker:4040/api/v1/
>> <http://worker:4040/api/v1/>applications *or 
>> *http://master:6066/v1/submissions/status/driver-20170414025324-
>> <http://master:6066/v1/submissions/status/driver-20170414025324-> *return
>> successful result. But if I open the spark master web ui then it requests
>> username and password.
>>
>>
>> On 14.04.2017 12:46, Saisai Shao wrote:
>>
>> Hi,
>>
>> What specifically are you referring to "Spark API endpoint"?
>>
>> Filter can only be worked with Spark Live and History web UI.
>>
>> On Fri, Apr 14, 2017 at 5:18 PM, Sergey < <grigorev-...@yandex.ru>
>> grigorev-...@yandex.ru> wrote:
>>
>>> Hello all,
>>>
>>> I've added own spark.ui.filters to enable basic authentication to access
>>> to
>>> Spark web UI. It works fine, but I still can do requests to spark API
>>> without any authentication.
>>> Is there any way to enable authentication for API endpoints?
>>>
>>> P.S. spark version is 2.1.0, deploy mode is standalone.
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> <http://apache-spark-user-list.1001560.n3.nabble.com/Spark-API-authentication-tp28601.html>
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-AP
>>> I-authentication-tp28601.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe e-mail: <user-unsubscr...@spark.apache.org>
>>> user-unsubscr...@spark.apache.org
>>>
>>>
>>
>>
>
>


Re: Spark API authentication

2017-04-14 Thread Saisai Shao
AFAIK, For the first line, custom filter should be worked. But for the
latter it is not supported.

On Fri, Apr 14, 2017 at 6:17 PM, Sergey Grigorev <grigorev-...@yandex.ru>
wrote:

> GET requests like *http://worker:4040/api/v1/applications
> <http://worker:4040/api/v1/applications> *or 
> *http://master:6066/v1/submissions/status/driver-20170414025324-
> <http://master:6066/v1/submissions/status/driver-20170414025324-> *return
> successful result. But if I open the spark master web ui then it requests
> username and password.
>
>
> On 14.04.2017 12:46, Saisai Shao wrote:
>
> Hi,
>
> What specifically are you referring to "Spark API endpoint"?
>
> Filter can only be worked with Spark Live and History web UI.
>
> On Fri, Apr 14, 2017 at 5:18 PM, Sergey <grigorev-...@yandex.ru> wrote:
>
>> Hello all,
>>
>> I've added own spark.ui.filters to enable basic authentication to access
>> to
>> Spark web UI. It works fine, but I still can do requests to spark API
>> without any authentication.
>> Is there any way to enable authentication for API endpoints?
>>
>> P.S. spark version is 2.1.0, deploy mode is standalone.
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/Spark-API-authentication-tp28601.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>
>


Re: Spark API authentication

2017-04-14 Thread Saisai Shao
Hi,

What specifically are you referring to "Spark API endpoint"?

Filter can only be worked with Spark Live and History web UI.

On Fri, Apr 14, 2017 at 5:18 PM, Sergey  wrote:

> Hello all,
>
> I've added own spark.ui.filters to enable basic authentication to access to
> Spark web UI. It works fine, but I still can do requests to spark API
> without any authentication.
> Is there any way to enable authentication for API endpoints?
>
> P.S. spark version is 2.1.0, deploy mode is standalone.
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Spark-API-authentication-tp28601.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: spark kafka consumer with kerberos

2017-03-31 Thread Saisai Shao
Hi Bill,

Normally Kerberos principal and keytab should be enough, because keytab
could actually represent the password. Did you configure SASL/GSSAPI or
SASL/Plain for KafkaClient?
http://kafka.apache.org/documentation.html#security_sasl

Actually this is more like a Kafka question and normally should be a
configuration issue, I would suggest you to ask this question in Kafka mail
list.

Thanks
Saisai


On Fri, Mar 31, 2017 at 10:28 PM, Bill Schwanitz <bil...@bilsch.org> wrote:

> Saisai,
>
> Yea that seems to have helped. Looks like the kerberos ticket when I
> submit does not get passed to the executor?
>
> ... 3 more
> Caused by: org.apache.kafka.common.KafkaException:
> javax.security.auth.login.LoginException: Unable to obtain password from
> user
>
> at org.apache.kafka.common.network.SaslChannelBuilder.
> configure(SaslChannelBuilder.java:86)
> at org.apache.kafka.common.network.ChannelBuilders.
> create(ChannelBuilders.java:70)
> at org.apache.kafka.clients.ClientUtils.createChannelBuilder(
> ClientUtils.java:83)
> at org.apache.kafka.clients.consumer.KafkaConsumer.(
> KafkaConsumer.java:623)
> ... 14 more
> Caused by: javax.security.auth.login.LoginException: Unable to obtain
> password from user
>
>
> On Fri, Mar 31, 2017 at 9:08 AM, Saisai Shao <sai.sai.s...@gmail.com>
> wrote:
>
>> Hi Bill,
>>
>> The exception is from executor side. From the gist you provided, looks
>> like the issue is that you only configured java options in driver side, I
>> think you should also configure this in executor side. You could refer to
>> here (https://github.com/hortonworks-spark/skc#running-on-a-
>> kerberos-enabled-cluster).
>>
>> --files key.conf#key.conf,v.keytab#v.keytab
>> --driver-java-options "-Djava.security.auth.login.config=./key.conf"
>> --conf 
>> "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./key.conf"
>>
>>
>> On Fri, Mar 31, 2017 at 1:58 AM, Bill Schwanitz <bil...@bilsch.org>
>> wrote:
>>
>>> I'm working on a poc spark job to pull data from a kafka topic with
>>> kerberos enabled ( required ) brokers.
>>>
>>> The code seems to connect to kafka and enter a polling mode. When I toss
>>> something onto the topic I get an exception which I just can't seem to
>>> figure out. Any ideas?
>>>
>>> I have a full gist up at https://gist.github.com/bil
>>> sch/17f4a4c4303ed3e004e2234a5904f0de with a lot of details. If I use
>>> the hdfs/spark client code for just normal operations everything works fine
>>> but for some reason the streaming code is having issues. I have verified
>>> the KafkaClient object is in the jaas config. The keytab is good etc.
>>>
>>> Guessing I'm doing something wrong I just have not figured out what yet!
>>> Any thoughts?
>>>
>>> The exception:
>>>
>>> 17/03/30 12:54:00 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID
>>> 0, host5.some.org.net): org.apache.kafka.common.KafkaException: Failed
>>> to construct kafka consumer
>>> at org.apache.kafka.clients.consumer.KafkaConsumer.(Kafka
>>> Consumer.java:702)
>>> at org.apache.kafka.clients.consumer.KafkaConsumer.(Kafka
>>> Consumer.java:557)
>>> at org.apache.kafka.clients.consumer.KafkaConsumer.(Kafka
>>> Consumer.java:540)
>>> at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.>> t>(CachedKafkaConsumer.scala:47)
>>> at org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.get
>>> (CachedKafkaConsumer.scala:157)
>>> at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterato
>>> r.(KafkaRDD.scala:210)
>>> at org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRD
>>> D.scala:185)
>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:86)
>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>>> Executor.java:1142)
>>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>>> lExecutor.java:617)
>>> at java.lang.Thread.run(Thread.java:745)
>>> Caused by: org.apache.kafka.common.KafkaException:
>>> org.apache.kafka.common.KafkaException: Jaas configuration not found
>>> at org.apache.kafka.common

Re: spark kafka consumer with kerberos

2017-03-31 Thread Saisai Shao
Hi Bill,

The exception is from executor side. From the gist you provided, looks like
the issue is that you only configured java options in driver side, I think
you should also configure this in executor side. You could refer to here (
https://github.com/hortonworks-spark/skc#running-on-a-kerberos-enabled-cluster
).

--files key.conf#key.conf,v.keytab#v.keytab
--driver-java-options "-Djava.security.auth.login.config=./key.conf"
--conf 
"spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./key.conf"


On Fri, Mar 31, 2017 at 1:58 AM, Bill Schwanitz  wrote:

> I'm working on a poc spark job to pull data from a kafka topic with
> kerberos enabled ( required ) brokers.
>
> The code seems to connect to kafka and enter a polling mode. When I toss
> something onto the topic I get an exception which I just can't seem to
> figure out. Any ideas?
>
> I have a full gist up at https://gist.github.com/bilsch/
> 17f4a4c4303ed3e004e2234a5904f0de with a lot of details. If I use the
> hdfs/spark client code for just normal operations everything works fine but
> for some reason the streaming code is having issues. I have verified the
> KafkaClient object is in the jaas config. The keytab is good etc.
>
> Guessing I'm doing something wrong I just have not figured out what yet!
> Any thoughts?
>
> The exception:
>
> 17/03/30 12:54:00 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
> host5.some.org.net): org.apache.kafka.common.KafkaException: Failed to
> construct kafka consumer
> at org.apache.kafka.clients.consumer.KafkaConsumer.(
> KafkaConsumer.java:702)
> at org.apache.kafka.clients.consumer.KafkaConsumer.(
> KafkaConsumer.java:557)
> at org.apache.kafka.clients.consumer.KafkaConsumer.(
> KafkaConsumer.java:540)
> at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.<
> init>(CachedKafkaConsumer.scala:47)
> at org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.
> get(CachedKafkaConsumer.scala:157)
> at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.(
> KafkaRDD.scala:210)
> at org.apache.spark.streaming.kafka010.KafkaRDD.compute(
> KafkaRDD.scala:185)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> at org.apache.spark.scheduler.Task.run(Task.scala:86)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.kafka.common.KafkaException:
> org.apache.kafka.common.KafkaException: Jaas configuration not found
> at org.apache.kafka.common.network.SaslChannelBuilder.
> configure(SaslChannelBuilder.java:86)
> at org.apache.kafka.common.network.ChannelBuilders.
> create(ChannelBuilders.java:70)
> at org.apache.kafka.clients.ClientUtils.createChannelBuilder(
> ClientUtils.java:83)
> at org.apache.kafka.clients.consumer.KafkaConsumer.(
> KafkaConsumer.java:623)
> ... 14 more
> Caused by: org.apache.kafka.common.KafkaException: Jaas configuration not
> found
> at org.apache.kafka.common.security.kerberos.KerberosLogin.getServiceName(
> KerberosLogin.java:299)
> at org.apache.kafka.common.security.kerberos.KerberosLogin.configure(
> KerberosLogin.java:103)
> at org.apache.kafka.common.security.authenticator.LoginManager.(
> LoginManager.java:45)
> at org.apache.kafka.common.security.authenticator.LoginManager.
> acquireLoginManager(LoginManager.java:68)
> at org.apache.kafka.common.network.SaslChannelBuilder.
> configure(SaslChannelBuilder.java:78)
> ... 17 more
> Caused by: java.io.IOException: Could not find a 'KafkaClient' entry in
> this configuration.
> at org.apache.kafka.common.security.JaasUtils.jaasConfig(
> JaasUtils.java:50)
> at org.apache.kafka.common.security.kerberos.KerberosLogin.getServiceName(
> KerberosLogin.java:297)
> ... 21 more
>


Re: spark-submit config via file

2017-03-27 Thread Saisai Shao
It's quite obvious your hdfs URL is not complete, please looks at the
exception, your hdfs URI doesn't have host, port. Normally it should be OK
if HDFS is your default FS.

I think the problem is you're running on HDI, in which default FS is wasb.
So here short name without host:port will lead to error. This looks like a
HDI specific issue, you'd better ask HDI.

Exception in thread "main" java.io.IOException: Incomplete HDFS URI, no
host: hdfs:///hdp/apps/2.6.0.0-403/spark2/spark2-hdp-yarn-archive.tar.gz

at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(
DistributedFileSystem.java:154)

at org.apache.hadoop.fs.FileSystem.createFileSystem(
FileSystem.java:2791)

at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:99)

at org.apache.hadoop.fs.FileSystem$Cache.getInternal(
FileSystem.java:2825)

at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2807)

at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:386)

at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)




On Fri, Mar 24, 2017 at 9:18 PM, Yong Zhang  wrote:

> Of course it is possible.
>
>
> You can always to set any configurations in your application using API,
> instead of pass in through the CLI.
>
>
> val sparkConf = new SparkConf().setAppName(properties.get("appName")).set(
> "master", properties.get("master")).set(xxx, properties.get("xxx"))
>
> Your error is your environment problem.
>
> Yong
> --
> *From:* , Roy 
> *Sent:* Friday, March 24, 2017 7:38 AM
> *To:* user
> *Subject:* spark-submit config via file
>
> Hi,
>
> I am trying to deploy spark job by using spark-submit which has bunch of
> parameters like
>
> spark-submit --class StreamingEventWriterDriver --master yarn
> --deploy-mode cluster --executor-memory 3072m --executor-cores 4 --files
> streaming.conf spark_streaming_2.11-assembly-1.0-SNAPSHOT.jar -conf
> "streaming.conf"
>
> I was looking a way to put all these flags in the file to pass to
> spark-submit to make my spark-submitcommand simple like this
>
> spark-submit --class StreamingEventWriterDriver --master yarn
> --deploy-mode cluster --properties-file properties.conf --files
> streaming.conf spark_streaming_2.11-assembly-1.0-SNAPSHOT.jar -conf
> "streaming.conf"
>
> properties.conf has following contents
>
>
> spark.executor.memory 3072m
>
> spark.executor.cores 4
>
>
> But I am getting following error
>
>
> 17/03/24 11:36:26 INFO Client: Use hdfs cache file as spark.yarn.archive
> for HDP, hdfsCacheFile:hdfs:///hdp/apps/2.6.0.0-403/spark2/
> spark2-hdp-yarn-archive.tar.gz
>
> 17/03/24 11:36:26 WARN AzureFileSystemThreadPoolExecutor: Disabling
> threads for Delete operation as thread count 0 is <= 1
>
> 17/03/24 11:36:26 INFO AzureFileSystemThreadPoolExecutor: Time taken for
> Delete operation is: 1 ms with threads: 0
>
> 17/03/24 11:36:27 INFO Client: Deleted staging directory wasb://
> a...@abc.blob.core.windows.net/user/sshuser/.sparkStaging/application_
> 1488402758319_0492
>
> Exception in thread "main" java.io.IOException: Incomplete HDFS URI, no
> host: hdfs:///hdp/apps/2.6.0.0-403/spark2/spark2-hdp-yarn-archive.tar.gz
>
> at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(
> DistributedFileSystem.java:154)
>
> at org.apache.hadoop.fs.FileSystem.createFileSystem(
> FileSystem.java:2791)
>
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:99)
>
> at org.apache.hadoop.fs.FileSystem$Cache.getInternal(
> FileSystem.java:2825)
>
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2807)
>
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:386)
>
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
>
> at org.apache.spark.deploy.yarn.Client.copyFileToRemote(
> Client.scala:364)
>
> at org.apache.spark.deploy.yarn.Client.org$apache$spark$
> deploy$yarn$Client$$distribute$1(Client.scala:480)
>
> at org.apache.spark.deploy.yarn.Client.prepareLocalResources(
> Client.scala:552)
>
> at org.apache.spark.deploy.yarn.Client.
> createContainerLaunchContext(Client.scala:881)
>
> at org.apache.spark.deploy.yarn.Client.submitApplication(
> Client.scala:170)
>
> at org.apache.spark.deploy.yarn.Client.run(Client.scala:1218)
>
> at org.apache.spark.deploy.yarn.Client$.main(Client.scala:1277)
>
> at org.apache.spark.deploy.yarn.Client.main(Client.scala)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
>
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:498)
>
> at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$
> deploy$SparkSubmit$$runMain(SparkSubmit.scala:745)
>
>

Re: question on Write Ahead Log (Spark Streaming )

2017-03-08 Thread Saisai Shao
IIUC, your scenario is quite like what currently ReliableKafkaReceiver
does. You can only send ack to the upstream source after WAL is persistent,
otherwise because of asynchronization of data processing and data
receiving, there's still a chance data could be lost if you send out ack
before WAL.

You could refer to ReliableKafkaReceiver.

On Thu, Mar 9, 2017 at 12:58 AM, kant kodali  wrote:

> Hi All,
>
> I am using a Receiver based approach. And I understand that spark
> streaming API's will convert the received data from receiver into blocks
> and these blocks that are in memory are also stored in WAL if one enables
> it. my upstream source which is not Kafka can also replay by which I mean
> if I don't send an ack to my upstream source it will resend it so I don't
> have to write the received data to WAL however I still need to enable WAL
> correct? because there are blocks that are in memory that needs to written
> to WAL so they can be recovered later.
>
> Thanks,
> kant
>


Re: How to use ManualClock with Spark streaming

2017-02-28 Thread Saisai Shao
I don't think using ManualClock is a right way to fix your problem here in
Spark Streaming.

ManualClock in Spark is mainly used for unit test, it should manually
advance the time to make the unit test work. The usage looks different
compared to the scenario you mentioned.

Thanks
Jerry

On Tue, Feb 28, 2017 at 10:53 PM, Hemalatha A <
hemalatha.amru...@googlemail.com> wrote:

>
> Hi,
>
> I am running streaming application reading data from kafka and performing
> window operations on it. I have a usecase where  all incoming events have a
> fixed latency of 10s, which means data belonging to minute 10:00:00 will
> arrive 10s late at 10:00:10.
>
> I want to set the spark clock to "Manualclock" and set the time behind by
> 10s so that the batch calculation triggers at 10:00:10, during which time
> all the events for the previous minute has arrived.
>
> But, I see that "spark.streaming.clock" is hardcoded to "
> org.apache.spark.util.SystemClock" in the code.
>
> Is there a way to easily  hack this property to use Manual clock.
> --
>
>
> Regards
> Hemalatha
>


Re: spark.speculation setting support on standalone mode?

2017-02-27 Thread Saisai Shao
I think it should be. These configurations doesn't depend on specific
cluster manager use chooses.



On Tue, Feb 28, 2017 at 4:42 AM, satishl  wrote:

> Are spark.speculation and related settings supported on standalone mode?
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/spark-speculation-setting-support-
> on-standalone-mode-tp28433.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Why spark history server does not show RDD even if it is persisted?

2017-02-22 Thread Saisai Shao
It is too verbose, and will significantly increase the size event log.

Here is the comment in the code:

// No-op because logging every update would be overkill
> override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = {}
>
>
On Thu, Feb 23, 2017 at 11:42 AM, Parag Chaudhari <paragp...@gmail.com>
wrote:

> Thanks a lot the information!
>
> Is there any reason why EventLoggingListener ignore this event?
>
> *Thanks,*
>
>
> *​Parag​*
>
> On Wed, Feb 22, 2017 at 7:11 PM, Saisai Shao <sai.sai.s...@gmail.com>
> wrote:
>
>> AFAIK, Spark's EventLoggingListerner ignores BlockUpdate event, so it
>> will not be written into event-log, I think that's why you cannot get such
>> info in history server.
>>
>> On Thu, Feb 23, 2017 at 9:51 AM, Parag Chaudhari <paragp...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I am running spark shell in spark version 2.0.2. Here is my program,
>>>
>>> var myrdd = sc.parallelize(Array.range(1, 10))
>>> myrdd.setName("test")
>>> myrdd.cache
>>> myrdd.collect
>>>
>>> But I am not able to see any RDD info in "storage" tab in spark history
>>> server.
>>>
>>> I looked at this
>>> <https://forums.databricks.com/questions/117/why-is-my-rdd-not-showing-up-in-the-storage-tab-of.html>
>>> but it is not helping as I have exact similar program mentioned there. Can
>>> anyone help?
>>>
>>>
>>> *Thanks,*
>>>
>>> *​Parag​*
>>>
>>
>>
>


Re: Why spark history server does not show RDD even if it is persisted?

2017-02-22 Thread Saisai Shao
AFAIK, Spark's EventLoggingListerner ignores BlockUpdate event, so it will
not be written into event-log, I think that's why you cannot get such info
in history server.

On Thu, Feb 23, 2017 at 9:51 AM, Parag Chaudhari 
wrote:

> Hi,
>
> I am running spark shell in spark version 2.0.2. Here is my program,
>
> var myrdd = sc.parallelize(Array.range(1, 10))
> myrdd.setName("test")
> myrdd.cache
> myrdd.collect
>
> But I am not able to see any RDD info in "storage" tab in spark history
> server.
>
> I looked at this
> 
> but it is not helping as I have exact similar program mentioned there. Can
> anyone help?
>
>
> *Thanks,*
>
> *​Parag​*
>


Re: Remove dependence on HDFS

2017-02-13 Thread Saisai Shao
IIUC Spark doesn't strongly bind to HDFS, it uses a common FileSystem layer
which supports different FS implementations, HDFS is just one option. You
could also use S3 as a backend FS, from Spark's point it is transparent to
different FS implementations.



On Sun, Feb 12, 2017 at 5:32 PM, ayan guha  wrote:

> How about adding more NFS storage?
>
> On Sun, 12 Feb 2017 at 8:14 pm, Sean Owen  wrote:
>
>> Data has to live somewhere -- how do you not add storage but store more
>> data?  Alluxio is not persistent storage, and S3 isn't on your premises.
>>
>> On Sun, Feb 12, 2017 at 4:29 AM Benjamin Kim  wrote:
>>
>> Has anyone got some advice on how to remove the reliance on HDFS for
>> storing persistent data. We have an on-premise Spark cluster. It seems like
>> a waste of resources to keep adding nodes because of a lack of storage
>> space only. I would rather add more powerful nodes due to the lack of
>> processing power at a less frequent rate, than add less powerful nodes at a
>> more frequent rate just to handle the ever growing data. Can anyone point
>> me in the right direction? Is Alluxio a good solution? S3? I would like to
>> hear your thoughts.
>>
>> Cheers,
>> Ben
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>> --
> Best Regards,
> Ayan Guha
>


Re: Livy with Spark

2016-12-07 Thread Saisai Shao
Hi Mich,

1. Each user could create a Livy session (batch or interactive), one
session is backed by one Spark application, and the resource quota is the
same as normal spark application (configured by
spark.executor.cores/memory,. etc), and this will be passed to yarn if
running on Yarn. This is basically no different compared to normal spark
application.
2. Livy is just a Rest service, inside Livy you could create multiple
sessions, each session is mapped to one spark application and resource
managed under the cluster manager (like YARN).
3. One user could create one session, this session will be run as the user,
it is similar to HiveServer's user impersonation or poxy user.



On Wed, Dec 7, 2016 at 5:51 AM, Mich Talebzadeh 
wrote:

> Thanks Richard.
>
> I saw your question in the above blog:
>
> How does Livy proxy the user? Per task? Do you know how quotas are
> assigned to users, like how do you stop one Livy user from using all of the
> resources available to the Executors?
>
> My points are:
>
>
>1. Still don't understand how quotas are assigned to users. Is that
>done by YARN?
>2. What will happen if more than one Livy is running on the same
>cluster all controlled by the same YARN. how resouces are allocated
>
> cheers
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 5 December 2016 at 14:50, Richard Startin 
> wrote:
>
>> There is a great write up on Livy at
>> http://henning.kropponline.de/2016/11/06/
>>
>> On 5 Dec 2016, at 14:34, Mich Talebzadeh 
>> wrote:
>>
>> Hi,
>>
>> Has there been any experience using Livy with Spark to share multiple
>> Spark contexts?
>>
>> thanks
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>


Re: spark.yarn.executor.memoryOverhead

2016-11-23 Thread Saisai Shao
>From my understanding, this memory overhead should include
"spark.memory.offHeap.size", which means off-heap memory size should not be
larger than the overhead memory size when running in yarn.

On Thu, Nov 24, 2016 at 3:01 AM, Koert Kuipers  wrote:

> in YarnAllocator i see that memoryOverhead is by default set to
> math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt,
> MEMORY_OVERHEAD_MIN))
>
> this does not take into account spark.memory.offHeap.size i think. should
> it?
>
> something like:
>
> math.max((MEMORY_OVERHEAD_FACTOR * executorMemory + offHeapMemory).toInt,
> MEMORY_OVERHEAD_MIN))
>


Re: dataframe data visualization

2016-11-20 Thread Saisai Shao
You might take a look at this project (https://github.com/vegas-viz/Vegas),
it has Spark integration.

Thanks
Saisai

On Mon, Nov 21, 2016 at 10:23 AM, wenli.o...@alibaba-inc.com <
wenli.o...@alibaba-inc.com> wrote:

> Hi anyone,
>
> is there any easy way for me to do data visualization in spark using scala
> when data is in dataframe format? Thanks.
>
> Wayne Ouyang


Re: spark pi example fail on yarn

2016-10-20 Thread Saisai Shao
It is not Spark has difficulty to communicate with YARN, it simply means AM
is exited with FINISHED state.

I'm guessing it might be related to memory constraints for container,
please check the yarn RM and NM logs to find out more details.

Thanks
Saisai

On Fri, Oct 21, 2016 at 8:14 AM, Xi Shen <davidshe...@gmail.com> wrote:

> 16/10/20 18:12:14 ERROR cluster.YarnClientSchedulerBackend: Yarn
> application has already exited with state FINISHED!
>
>  From this, I think it is spark has difficult communicating with YARN. You
> should check your Spark log.
>
>
> On Fri, Oct 21, 2016 at 8:06 AM Li Li <fancye...@gmail.com> wrote:
>
> which log file should I
>
> On Thu, Oct 20, 2016 at 10:02 PM, Saisai Shao <sai.sai.s...@gmail.com>
> wrote:
> > Looks like ApplicationMaster is killed by SIGTERM.
> >
> > 16/10/20 18:12:04 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL TERM
> > 16/10/20 18:12:04 INFO yarn.ApplicationMaster: Final app status:
> >
> > This container may be killed by yarn NodeManager or other processes,
> you'd
> > better check yarn log to dig out more details.
> >
> > Thanks
> > Saisai
> >
> > On Thu, Oct 20, 2016 at 6:51 PM, Li Li <fancye...@gmail.com> wrote:
> >>
> >> I am setting up a small yarn/spark cluster. hadoop/yarn version is
> >> 2.7.3 and I can run wordcount map-reduce correctly in yarn.
> >> And I am using  spark-2.0.1-bin-hadoop2.7 using command:
> >> ~/spark-2.0.1-bin-hadoop2.7$ ./bin/spark-submit --class
> >> org.apache.spark.examples.SparkPi --master yarn-client
> >> examples/jars/spark-examples_2.11-2.0.1.jar 1
> >> it fails and the first error is:
> >> 16/10/20 18:12:03 INFO storage.BlockManagerMaster: Registered
> >> BlockManager BlockManagerId(driver, 10.161.219.189, 39161)
> >> 16/10/20 18:12:03 INFO handler.ContextHandler: Started
> >> o.s.j.s.ServletContextHandler@76ad6715{/metrics/json,null,AVAILABLE}
> >> 16/10/20 18:12:12 INFO
> >> cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster
> >> registered as NettyRpcEndpointRef(null)
> >> 16/10/20 18:12:12 INFO cluster.YarnClientSchedulerBackend: Add WebUI
> >> Filter. org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter,
> >> Map(PROXY_HOSTS -> ai-hz1-spark1, PROXY_URI_BASES ->
> >> http://ai-hz1-spark1:8088/proxy/application_1476957324184_0002),
> >> /proxy/application_1476957324184_0002
> >> 16/10/20 18:12:12 INFO ui.JettyUtils: Adding filter:
> >> org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
> >> 16/10/20 18:12:12 INFO cluster.YarnClientSchedulerBackend:
> >> SchedulerBackend is ready for scheduling beginning after waiting
> >> maxRegisteredResourcesWaitingTime: 3(ms)
> >> 16/10/20 18:12:12 WARN spark.SparkContext: Use an existing
> >> SparkContext, some configuration may not take effect.
> >> 16/10/20 18:12:12 INFO handler.ContextHandler: Started
> >> o.s.j.s.ServletContextHandler@489091bd{/SQL,null,AVAILABLE}
> >> 16/10/20 18:12:12 INFO handler.ContextHandler: Started
> >> o.s.j.s.ServletContextHandler@1de9b505{/SQL/json,null,AVAILABLE}
> >> 16/10/20 18:12:12 INFO handler.ContextHandler: Started
> >> o.s.j.s.ServletContextHandler@378f002a{/SQL/execution,null,AVAILABLE}
> >> 16/10/20 18:12:12 INFO handler.ContextHandler: Started
> >> o.s.j.s.ServletContextHandler@2cc75074{/SQL/execution/json,
> null,AVAILABLE}
> >> 16/10/20 18:12:12 INFO handler.ContextHandler: Started
> >> o.s.j.s.ServletContextHandler@2d64160c{/static/sql,null,AVAILABLE}
> >> 16/10/20 18:12:12 INFO internal.SharedState: Warehouse path is
> >> '/home/hadoop/spark-2.0.1-bin-hadoop2.7/spark-warehouse'.
> >> 16/10/20 18:12:13 INFO spark.SparkContext: Starting job: reduce at
> >> SparkPi.scala:38
> >> 16/10/20 18:12:13 INFO scheduler.DAGScheduler: Got job 0 (reduce at
> >> SparkPi.scala:38) with 1 output partitions
> >> 16/10/20 18:12:13 INFO scheduler.DAGScheduler: Final stage:
> >> ResultStage 0 (reduce at SparkPi.scala:38)
> >> 16/10/20 18:12:13 INFO scheduler.DAGScheduler: Parents of final stage:
> >> List()
> >> 16/10/20 18:12:13 INFO scheduler.DAGScheduler: Missing parents: List()
> >> 16/10/20 18:12:13 INFO scheduler.DAGScheduler: Submitting ResultStage
> >> 0 (MapPartitionsRDD[1] at map at SparkPi.scala:34), which has no
> >> missing parents
> >> 16/10/20 18:12:13 INFO memory.MemoryStore: Block broadcast_0 stored as
> >> values in memory (estimated size 1832.0 B, free 366.3 MB)
> >&

Re: spark pi example fail on yarn

2016-10-20 Thread Saisai Shao
Looks like ApplicationMaster is killed by SIGTERM.

16/10/20 18:12:04 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL TERM
16/10/20 18:12:04 INFO yarn.ApplicationMaster: Final app status:

This container may be killed by yarn NodeManager or other processes, you'd
better check yarn log to dig out more details.

Thanks
Saisai

On Thu, Oct 20, 2016 at 6:51 PM, Li Li  wrote:

> I am setting up a small yarn/spark cluster. hadoop/yarn version is
> 2.7.3 and I can run wordcount map-reduce correctly in yarn.
> And I am using  spark-2.0.1-bin-hadoop2.7 using command:
> ~/spark-2.0.1-bin-hadoop2.7$ ./bin/spark-submit --class
> org.apache.spark.examples.SparkPi --master yarn-client
> examples/jars/spark-examples_2.11-2.0.1.jar 1
> it fails and the first error is:
> 16/10/20 18:12:03 INFO storage.BlockManagerMaster: Registered
> BlockManager BlockManagerId(driver, 10.161.219.189, 39161)
> 16/10/20 18:12:03 INFO handler.ContextHandler: Started
> o.s.j.s.ServletContextHandler@76ad6715{/metrics/json,null,AVAILABLE}
> 16/10/20 18:12:12 INFO
> cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster
> registered as NettyRpcEndpointRef(null)
> 16/10/20 18:12:12 INFO cluster.YarnClientSchedulerBackend: Add WebUI
> Filter. org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter,
> Map(PROXY_HOSTS -> ai-hz1-spark1, PROXY_URI_BASES ->
> http://ai-hz1-spark1:8088/proxy/application_1476957324184_0002),
> /proxy/application_1476957324184_0002
> 16/10/20 18:12:12 INFO ui.JettyUtils: Adding filter:
> org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
> 16/10/20 18:12:12 INFO cluster.YarnClientSchedulerBackend:
> SchedulerBackend is ready for scheduling beginning after waiting
> maxRegisteredResourcesWaitingTime: 3(ms)
> 16/10/20 18:12:12 WARN spark.SparkContext: Use an existing
> SparkContext, some configuration may not take effect.
> 16/10/20 18:12:12 INFO handler.ContextHandler: Started
> o.s.j.s.ServletContextHandler@489091bd{/SQL,null,AVAILABLE}
> 16/10/20 18:12:12 INFO handler.ContextHandler: Started
> o.s.j.s.ServletContextHandler@1de9b505{/SQL/json,null,AVAILABLE}
> 16/10/20 18:12:12 INFO handler.ContextHandler: Started
> o.s.j.s.ServletContextHandler@378f002a{/SQL/execution,null,AVAILABLE}
> 16/10/20 18:12:12 INFO handler.ContextHandler: Started
> o.s.j.s.ServletContextHandler@2cc75074{/SQL/execution/json,null,AVAILABLE}
> 16/10/20 18:12:12 INFO handler.ContextHandler: Started
> o.s.j.s.ServletContextHandler@2d64160c{/static/sql,null,AVAILABLE}
> 16/10/20 18:12:12 INFO internal.SharedState: Warehouse path is
> '/home/hadoop/spark-2.0.1-bin-hadoop2.7/spark-warehouse'.
> 16/10/20 18:12:13 INFO spark.SparkContext: Starting job: reduce at
> SparkPi.scala:38
> 16/10/20 18:12:13 INFO scheduler.DAGScheduler: Got job 0 (reduce at
> SparkPi.scala:38) with 1 output partitions
> 16/10/20 18:12:13 INFO scheduler.DAGScheduler: Final stage:
> ResultStage 0 (reduce at SparkPi.scala:38)
> 16/10/20 18:12:13 INFO scheduler.DAGScheduler: Parents of final stage:
> List()
> 16/10/20 18:12:13 INFO scheduler.DAGScheduler: Missing parents: List()
> 16/10/20 18:12:13 INFO scheduler.DAGScheduler: Submitting ResultStage
> 0 (MapPartitionsRDD[1] at map at SparkPi.scala:34), which has no
> missing parents
> 16/10/20 18:12:13 INFO memory.MemoryStore: Block broadcast_0 stored as
> values in memory (estimated size 1832.0 B, free 366.3 MB)
> 16/10/20 18:12:13 INFO memory.MemoryStore: Block broadcast_0_piece0
> stored as bytes in memory (estimated size 1169.0 B, free 366.3 MB)
> 16/10/20 18:12:13 INFO storage.BlockManagerInfo: Added
> broadcast_0_piece0 in memory on 10.161.219.189:39161 (size: 1169.0 B,
> free: 366.3 MB)
> 16/10/20 18:12:13 INFO spark.SparkContext: Created broadcast 0 from
> broadcast at DAGScheduler.scala:1012
> 16/10/20 18:12:13 INFO scheduler.DAGScheduler: Submitting 1
> missing tasks from ResultStage 0 (MapPartitionsRDD[1] at map at
> SparkPi.scala:34)
> 16/10/20 18:12:13 INFO cluster.YarnScheduler: Adding task set 0.0 with
> 1 tasks
> 16/10/20 18:12:14 ERROR cluster.YarnClientSchedulerBackend: Yarn
> application has already exited with state FINISHED!
> 16/10/20 18:12:14 INFO server.ServerConnector: Stopped
> ServerConnector@389adf1d{HTTP/1.1}{0.0.0.0:4040}
> 16/10/20 18:12:14 INFO handler.ContextHandler: Stopped
> o.s.j.s.ServletContextHandler@841e575{/stages/stage/kill,null,UNAVAILABLE}
> 16/10/20 18:12:14 INFO handler.ContextHandler: Stopped
> o.s.j.s.ServletContextHandler@66629f63{/api,null,UNAVAILABLE}
> 16/10/20 18:12:14 INFO handler.ContextHandler: Stopped
> o.s.j.s.ServletContextHandler@2b62442c{/,null,UNAVAILABLE}
>
>
> I also use yarn log to get logs from yarn(total log is very lengthy in
> attachement):
> 16/10/20 18:12:03 INFO yarn.ExecutorRunnable:
> 
> ===
> YARN executor launch context:
>   env:
> CLASSPATH ->
> {{PWD}}{{PWD}}/__spark_conf__{{PWD}}/__spark_
> 

Re: NoClassDefFoundError: org/apache/spark/Logging in SparkSession.getOrCreate

2016-10-17 Thread Saisai Shao
Not sure why your code will search Logging class under org/apache/spark,
this should be “org/apache/spark/internal/Logging”, and it changed long
time ago.


On Sun, Oct 16, 2016 at 3:25 AM, Brad Cox  wrote:

> I'm experimenting with Spark 2.0.1 for the first time and hitting a
> problem right out of the gate.
>
> My main routine starts with this which I think is the standard idiom.
>
> SparkSession sparkSession = SparkSession
> .builder()
> .master("local")
> .appName("DecisionTreeExample")
> .getOrCreate();
>
> Running this in the eclipse debugger, execution fails in getOrCreate()
> with this exception
>
> Exception in thread "main" java.lang.NoClassDefFoundError:
> org/apache/spark/Logging
> at java.lang.ClassLoader.defineClass1(Native Method)
> at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
> at java.security.SecureClassLoader.defineClass(
> SecureClassLoader.java:142)
> at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
> at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> at org.apache.spark.sql.SparkSession.(
> SparkSession.scala:122)
> at org.apache.spark.sql.SparkSession.(SparkSession.scala:77)
> at org.apache.spark.sql.SparkSession$Builder.
> getOrCreate(SparkSession.scala:840)
> at titanic.DecisionTreeExample.main(DecisionTreeExample.java:54)
>
> java.lang.NoClassDefFoundError means a class is not found at run time that
> was present at
> compile time. I've googled everything I can think of and found no
> solutions. Can someone
> help? Thanks!
>
> These are my spark-relevant dependencies:
>
> 
> org.apache.spark
> spark-core_2.11
> 2.0.1
> 
> 
> org.apache.spark
> spark-mllib_2.11
> 2.0.1
> 
> 
> org.apache.spark
> spark-sql_2.11
> 2.0.1
> 
>
>
>
> Dr. Brad J. CoxCell: 703-594-1883 Skype: dr.brad.cox
>
>
>
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: spark with kerberos

2016-10-13 Thread Saisai Shao
I think security has nothing to do with what API you use, spark sql or RDD
API.

Assuming you're running on yarn cluster (that is the only cluster manager
supports Kerberos currently).

Firstly you need to get Kerberos tgt in your local spark-submit process,
after being authenticated by Kerberos, Spark could get delegation tokens
from HDFS, so that you could communicate with security hadoop cluster. Here
in your case since you have to communicate with other remote HDFS clusters,
so you have to get tokens from all these remote clusters, you could
configure "spark.yarn.access.namenodes" to list all the security hdfs
cluster you want to access, then hadoop client API will get tokens from all
these clusters.

For the details you could refer to
https://spark.apache.org/docs/latest/running-on-yarn.html.

I didn't try personally since I don't have such requirements. It may
requires additional steps which I missed. You could take a a try.


On Thu, Oct 13, 2016 at 6:38 PM, Denis Bolshakov 
wrote:

> The problem happens when writting (reading works fine)
>
> rdd.saveAsNewAPIHadoopFile
>
> We use just RDD and HDFS, no other things.
> Spark 1.6.1 version.
> `Claster A` - CDH 5.7.1
> `Cluster B` - vanilla hadoop 2.6.5
> `Cluster C` - CDH 5.8.0
>
> Best regards,
> Denis
>
> On 13 October 2016 at 13:06, ayan guha  wrote:
>
>> And a little more details on Spark version, hadoop version and
>> distribution would also help...
>>
>> On Thu, Oct 13, 2016 at 9:05 PM, ayan guha  wrote:
>>
>>> I think one point you need to mention is your target - HDFS, Hive or
>>> Hbase (or something else) and which end points are used.
>>>
>>> On Thu, Oct 13, 2016 at 8:50 PM, dbolshak 
>>> wrote:
>>>
 Hello community,

 We've a challenge and no ideas how to solve it.

 The problem,

 Say we have the following environment:
 1. `cluster A`, the cluster does not use kerberos and we use it as a
 source
 of data, important thing is - we don't manage this cluster.
 2. `cluster B`, small cluster where our spark application is running and
 performing some logic. (we manage this cluster and it does not have
 kerberos).
 3. `cluster C`, the cluster uses kerberos and we use it to keep results
 of
 our spark application, we manage this cluster

 Our requrements and conditions that are not mentioned yet:
 1. All clusters are in a single data center, but in the different
 subnetworks.
 2. We cannot turn on kerberos on `cluster A`
 3. We cannot turn off kerberos on `cluster C`
 4. We can turn on/off kerberos on `cluster B`, currently it's turned
 off.
 5. Spark app is built on top of RDD and does not depend on spark-sql.

 Does anybody know how to write data using RDD api to remote cluster
 which is
 running with Kerberos?

 --
 //with Best Regards
 --Denis Bolshakov
 e-mail: bolshakov.de...@gmail.com



 --
 View this message in context: http://apache-spark-user-list.
 1001560.n3.nabble.com/spark-with-kerberos-tp27894.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe e-mail: user-unsubscr...@spark.apache.org


>>>
>>>
>>> --
>>> Best Regards,
>>> Ayan Guha
>>>
>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>
>
> --
> //with Best Regards
> --Denis Bolshakov
> e-mail: bolshakov.de...@gmail.com
>
>
>


Re: Spark metrics when running with YARN?

2016-09-17 Thread Saisai Shao
H Vladimir,

I think you mixed cluster manager and Spark application running on it, the
master and workers are two components for Standalone cluster manager, the
yarn counterparts are RM and NM. the URL you listed above is only worked
for standalone master and workers.

It would be more clear if you could try running simple Spark applications
on Standalone and Yarn.

On Fri, Sep 16, 2016 at 10:32 PM, Vladimir Tretyakov <
vladimir.tretya...@sematext.com> wrote:

> Hello.
>
> Found that there is also Spark metric Sink like MetricsServlet.
> which is enabled by default:
>
> https://apache.googlesource.com/spark/+/refs/heads/master/
> core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala#40
>
> Tried urls:
>
> On master:
> http://localhost:8080/metrics/master/json/
> http://localhost:8080/metrics/applications/json
>
> On slaves (with workers):
> http://localhost:4040/metrics/json/
>
> got information I need.
>
> Questions:
> 1. Will URLs for masted work in YARN (client/server mode) and Mesos modes?
> Or this is only for Standalone mode?
> 2. Will URL for slave also work for modes other than Standalone?
>
> Why are there 2 ways to get information, REST API and this Sink?
>
>
> Best regards, Vladimir.
>
>
>
>
>
>
> On Mon, Sep 12, 2016 at 3:53 PM, Vladimir Tretyakov <
> vladimir.tretya...@sematext.com> wrote:
>
>> Hello Saisai Shao, Jacek Laskowski , thx for information.
>>
>> We are working on Spark monitoring tool and our users have different
>> setup modes (Standalone, Mesos, YARN).
>>
>> Looked at code, found:
>>
>> /**
>>  * Attempt to start a Jetty server bound to the supplied hostName:port using 
>> the given
>>  * context handlers.
>>  *
>>  * If the desired port number is contended, continues
>> *incrementing ports until a free port is** * found*. Return the jetty Server 
>> object, the chosen port, and a mutable collection of handlers.
>>  */
>>
>> It seems most generic way (which will work for most users) will be start
>> looking at ports:
>>
>> spark.ui.port (4040 by default)
>> spark.ui.port + 1
>> spark.ui.port + 2
>> spark.ui.port + 3
>> ...
>>
>> Until we will get responses from Spark.
>>
>> PS: yeah they may be some intersections with some other applications for
>> some setups, in this case we may ask users about these exceptions and do
>> our housework around them.
>>
>> Best regards, Vladimir.
>>
>> On Mon, Sep 12, 2016 at 12:07 PM, Saisai Shao <sai.sai.s...@gmail.com>
>> wrote:
>>
>>> Here is the yarn RM REST API for you to refer (
>>> http://hadoop.apache.org/docs/r2.7.0/hadoop-yarn/hadoop-yar
>>> n-site/ResourceManagerRest.html). You can use these APIs to query
>>> applications running on yarn.
>>>
>>> On Sun, Sep 11, 2016 at 11:25 PM, Jacek Laskowski <ja...@japila.pl>
>>> wrote:
>>>
>>>> Hi Vladimir,
>>>>
>>>> You'd have to talk to your cluster manager to query for all the
>>>> running Spark applications. I'm pretty sure YARN and Mesos can do that
>>>> but unsure about Spark Standalone. This is certainly not something a
>>>> Spark application's web UI could do for you since it is designed to
>>>> handle the single Spark application.
>>>>
>>>> Pozdrawiam,
>>>> Jacek Laskowski
>>>> 
>>>> https://medium.com/@jaceklaskowski/
>>>> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
>>>> Follow me at https://twitter.com/jaceklaskowski
>>>>
>>>>
>>>> On Sun, Sep 11, 2016 at 11:18 AM, Vladimir Tretyakov
>>>> <vladimir.tretya...@sematext.com> wrote:
>>>> > Hello Jacek, thx a lot, it works.
>>>> >
>>>> > Is there a way how to get list of running applications from REST API?
>>>> Or I
>>>> > have to try connect 4040 4041... 40xx ports and check if ports answer
>>>> > something?
>>>> >
>>>> > Best regards, Vladimir.
>>>> >
>>>> > On Sat, Sep 10, 2016 at 6:00 AM, Jacek Laskowski <ja...@japila.pl>
>>>> wrote:
>>>> >>
>>>> >> Hi,
>>>> >>
>>>> >> That's correct. One app one web UI. Open 4041 and you'll see the
>>>> other
>>>> >> app.
>>>> >>
>>>> >> Jacek
>>>> >>
>>>> >>
>>>> >> On 9 Sep 2

Re: Spark metrics when running with YARN?

2016-09-12 Thread Saisai Shao
Here is the yarn RM REST API for you to refer (
http://hadoop.apache.org/docs/r2.7.0/hadoop-yarn/hadoop-yarn-site/ResourceManagerRest.html).
You can use these APIs to query applications running on yarn.

On Sun, Sep 11, 2016 at 11:25 PM, Jacek Laskowski  wrote:

> Hi Vladimir,
>
> You'd have to talk to your cluster manager to query for all the
> running Spark applications. I'm pretty sure YARN and Mesos can do that
> but unsure about Spark Standalone. This is certainly not something a
> Spark application's web UI could do for you since it is designed to
> handle the single Spark application.
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Sun, Sep 11, 2016 at 11:18 AM, Vladimir Tretyakov
>  wrote:
> > Hello Jacek, thx a lot, it works.
> >
> > Is there a way how to get list of running applications from REST API? Or
> I
> > have to try connect 4040 4041... 40xx ports and check if ports answer
> > something?
> >
> > Best regards, Vladimir.
> >
> > On Sat, Sep 10, 2016 at 6:00 AM, Jacek Laskowski 
> wrote:
> >>
> >> Hi,
> >>
> >> That's correct. One app one web UI. Open 4041 and you'll see the other
> >> app.
> >>
> >> Jacek
> >>
> >>
> >> On 9 Sep 2016 11:53 a.m., "Vladimir Tretyakov"
> >>  wrote:
> >>>
> >>> Hello again.
> >>>
> >>> I am trying to play with Spark version "2.11-2.0.0".
> >>>
> >>> Problem that REST API and UI shows me different things.
> >>>
> >>> I've stared 2 applications from "examples set": opened 2 consoles and
> run
> >>> following command in each:
> >>>
> >>> ./bin/spark-submit   --class org.apache.spark.examples.SparkPi
>  --master
> >>> spark://wawanawna:7077  --executor-memory 2G  --total-executor-cores 30
> >>> examples/jars/spark-examples_2.11-2.0.0.jar  1
> >>>
> >>> Request to API endpoint:
> >>>
> >>> http://localhost:4040/api/v1/applications
> >>>
> >>> returned me following JSON:
> >>>
> >>> [ {
> >>>   "id" : "app-20160909184529-0016",
> >>>   "name" : "Spark Pi",
> >>>   "attempts" : [ {
> >>> "startTime" : "2016-09-09T15:45:25.047GMT",
> >>> "endTime" : "1969-12-31T23:59:59.999GMT",
> >>> "lastUpdated" : "2016-09-09T15:45:25.047GMT",
> >>> "duration" : 0,
> >>> "sparkUser" : "",
> >>> "completed" : false,
> >>> "startTimeEpoch" : 1473435925047,
> >>> "endTimeEpoch" : -1,
> >>> "lastUpdatedEpoch" : 1473435925047
> >>>   } ]
> >>> } ]
> >>>
> >>> so response contains information only about 1 application.
> >>>
> >>> But in reality I've started 2 applications and Spark UI shows me 2
> >>> RUNNING application (please see screenshot).
> >>>
> >>> Does anybody maybe know answer why API and UI shows different things?
> >>>
> >>>
> >>> Best regards, Vladimir.
> >>>
> >>>
> >>> On Tue, Aug 30, 2016 at 3:52 PM, Vijay Kiran 
> wrote:
> 
>  Hi Otis,
> 
>  Did you check the REST API as documented in
>  http://spark.apache.org/docs/latest/monitoring.html
> 
>  Regards,
>  Vijay
> 
>  > On 30 Aug 2016, at 14:43, Otis Gospodnetić
>  >  wrote:
>  >
>  > Hi Mich and Vijay,
>  >
>  > Thanks!  I forgot to include an important bit - I'm looking for a
>  > programmatic way to get Spark metrics when running Spark under YARN
> - so JMX
>  > or API of some kind.
>  >
>  > Thanks,
>  > Otis
>  > --
>  > Monitoring - Log Management - Alerting - Anomaly Detection
>  > Solr & Elasticsearch Consulting Support Training -
>  > http://sematext.com/
>  >
>  >
>  > On Tue, Aug 30, 2016 at 6:59 AM, Mich Talebzadeh
>  >  wrote:
>  > Spark UI regardless of deployment mode Standalone, yarn etc runs on
>  > port 4040 by default that can be accessed directly
>  >
>  > Otherwise one can specify a specific port with --conf
>  > "spark.ui.port=5" for example 5
>  >
>  > HTH
>  >
>  > Dr Mich Talebzadeh
>  >
>  > LinkedIn
>  > https://www.linkedin.com/profile/view?id=
> AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  >
>  > http://talebzadehmich.wordpress.com
>  >
>  > Disclaimer: Use it at your own risk. Any and all responsibility for
>  > any loss, damage or destruction of data or any other property which
> may
>  > arise from relying on this email's technical content is explicitly
>  > disclaimed. The author will in no case be liable for any monetary
> damages
>  > arising from such loss, damage or destruction.
>  >
>  >
>  > On 30 August 2016 at 11:48, Vijay Kiran 
> wrote:
>  >
>  > From Yarm RM UI, find the spark application Id, and in the
> application
>  > details, you can click on the “Tracking URL” which 

Re: Re: Can we redirect Spark shuffle spill data to HDFS or Alluxio?

2016-08-24 Thread Saisai Shao
Based on my limited knowledge of Tachyon (Alluxio), it only provides a
layer of Hadoop compatible FileSystem API, which means it cannot be used in
shuffle data store. If it can be mounted as an OS supported FS layer, like
NFS or Fuse, then it can be used for shuffle data store.

But never neglect the overhead and stability of distributed FS (RPC
communication, network latency), since shuffle is quite critical.

On Wed, Aug 24, 2016 at 10:30 PM, tony@tendcloud.com <
tony@tendcloud.com> wrote:

> Hi, Saisai and Rui,
> Thanks a lot for your answer.  Alluxio tried to work as the middle layer
> between storage and Spark, so is it possible to use Alluxio to resolve the
> issue? We want to have 1 SSD for every datanode and use Alluxio to manage
> mem,ssd and hdd.
>
> Thanks and Regards,
> Tony
>
> --
> tony@tendcloud.com
>
>
> *From:* Sun Rui <sunrise_...@163.com>
> *Date:* 2016-08-24 22:17
> *To:* Saisai Shao <sai.sai.s...@gmail.com>
> *CC:* tony@tendcloud.com; user <user@spark.apache.org>
> *Subject:* Re: Can we redirect Spark shuffle spill data to HDFS or
> Alluxio?
> Yes, I also tried FUSE before, it is not stable and I don’t recommend it
>
> On Aug 24, 2016, at 22:15, Saisai Shao <sai.sai.s...@gmail.com> wrote:
>
> Also fuse is another candidate (https://wiki.apache.org/
> hadoop/MountableHDFS), but not so stable as I tried before.
>
> On Wed, Aug 24, 2016 at 10:09 PM, Sun Rui <sunrise_...@163.com> wrote:
>
>> For HDFS, maybe you can try mount HDFS as NFS. But not sure about the
>> stability, and also there is additional overhead of network I/O and replica
>> of HDFS files.
>>
>> On Aug 24, 2016, at 21:02, Saisai Shao <sai.sai.s...@gmail.com> wrote:
>>
>> Spark Shuffle uses Java File related API to create local dirs and R/W
>> data, so it can only be worked with OS supported FS. It doesn't leverage
>> Hadoop FileSystem API, so writing to Hadoop compatible FS is not worked.
>>
>> Also it is not suitable to write temporary shuffle data into distributed
>> FS, this will bring unnecessary overhead. In you case if you have large
>> memory on each node, you could use ramfs instead to store shuffle data.
>>
>> Thanks
>> Saisai
>>
>> On Wed, Aug 24, 2016 at 8:11 PM, tony@tendcloud.com <
>> tony@tendcloud.com> wrote:
>>
>>> Hi, All,
>>> When we run Spark on very large data, spark will do shuffle and the
>>> shuffle data will write to local disk. Because we have limited capacity at
>>> local disk, the shuffled data will occupied all of the local disk and then
>>> will be failed.  So is there a way we can write the shuffle spill data to
>>> HDFS? Or if we introduce alluxio in our system, can the shuffled data write
>>> to alluxio?
>>>
>>> Thanks and Regards,
>>>
>>> --
>>> 阎志涛(Tony)
>>>
>>> 北京腾云天下科技有限公司
>>> 
>>> 
>>> 邮箱:tony@tendcloud.com
>>> 电话:13911815695
>>> 微信: zhitao_yan
>>> QQ : 4707059
>>> 地址:北京市东城区东直门外大街39号院2号楼航空服务大厦602室
>>> 邮编:100027
>>> 
>>> 
>>> TalkingData.com <http://talkingdata.com/> - 让数据说话
>>>
>>
>>
>>
>
>


Re: Can we redirect Spark shuffle spill data to HDFS or Alluxio?

2016-08-24 Thread Saisai Shao
Also fuse is another candidate (https://wiki.apache.org/hadoop/MountableHDFS),
but not so stable as I tried before.

On Wed, Aug 24, 2016 at 10:09 PM, Sun Rui <sunrise_...@163.com> wrote:

> For HDFS, maybe you can try mount HDFS as NFS. But not sure about the
> stability, and also there is additional overhead of network I/O and replica
> of HDFS files.
>
> On Aug 24, 2016, at 21:02, Saisai Shao <sai.sai.s...@gmail.com> wrote:
>
> Spark Shuffle uses Java File related API to create local dirs and R/W
> data, so it can only be worked with OS supported FS. It doesn't leverage
> Hadoop FileSystem API, so writing to Hadoop compatible FS is not worked.
>
> Also it is not suitable to write temporary shuffle data into distributed
> FS, this will bring unnecessary overhead. In you case if you have large
> memory on each node, you could use ramfs instead to store shuffle data.
>
> Thanks
> Saisai
>
> On Wed, Aug 24, 2016 at 8:11 PM, tony@tendcloud.com <
> tony@tendcloud.com> wrote:
>
>> Hi, All,
>> When we run Spark on very large data, spark will do shuffle and the
>> shuffle data will write to local disk. Because we have limited capacity at
>> local disk, the shuffled data will occupied all of the local disk and then
>> will be failed.  So is there a way we can write the shuffle spill data to
>> HDFS? Or if we introduce alluxio in our system, can the shuffled data write
>> to alluxio?
>>
>> Thanks and Regards,
>>
>> --
>> 阎志涛(Tony)
>>
>> 北京腾云天下科技有限公司
>> -
>> ---
>> 邮箱:tony@tendcloud.com
>> 电话:13911815695
>> 微信: zhitao_yan
>> QQ : 4707059
>> 地址:北京市东城区东直门外大街39号院2号楼航空服务大厦602室
>> 邮编:100027
>> 
>> 
>> TalkingData.com <http://talkingdata.com/> - 让数据说话
>>
>
>
>


Re: Can we redirect Spark shuffle spill data to HDFS or Alluxio?

2016-08-24 Thread Saisai Shao
Spark Shuffle uses Java File related API to create local dirs and R/W data,
so it can only be worked with OS supported FS. It doesn't leverage Hadoop
FileSystem API, so writing to Hadoop compatible FS is not worked.

Also it is not suitable to write temporary shuffle data into distributed
FS, this will bring unnecessary overhead. In you case if you have large
memory on each node, you could use ramfs instead to store shuffle data.

Thanks
Saisai

On Wed, Aug 24, 2016 at 8:11 PM, tony@tendcloud.com <
tony@tendcloud.com> wrote:

> Hi, All,
> When we run Spark on very large data, spark will do shuffle and the
> shuffle data will write to local disk. Because we have limited capacity at
> local disk, the shuffled data will occupied all of the local disk and then
> will be failed.  So is there a way we can write the shuffle spill data to
> HDFS? Or if we introduce alluxio in our system, can the shuffled data write
> to alluxio?
>
> Thanks and Regards,
>
> --
> 阎志涛(Tony)
>
> 北京腾云天下科技有限公司
> -
> ---
> 邮箱:tony@tendcloud.com
> 电话:13911815695
> 微信: zhitao_yan
> QQ : 4707059
> 地址:北京市东城区东直门外大街39号院2号楼航空服务大厦602室
> 邮编:100027
> 
> 
> TalkingData.com  - 让数据说话
>


Re: dynamic allocation in Spark 2.0

2016-08-24 Thread Saisai Shao
This looks like Spark application is running into a abnormal status. From
the stack it means driver could not send requests to AM, can you please
check if AM is reachable or are there any other exceptions beside this one.

>From my past test, Spark's dynamic allocation may run into some corner
cases when NM is gone or restarted, it would be better to check all the
logs (driver, AM and executors) to find out some clues why run into this.
It is hard to tell a exact reason simply based on the exception you pasted
above.

On Wed, Aug 24, 2016 at 3:16 PM, Shane Lee 
wrote:

> Hello all,
>
> I am running hadoop 2.6.4 with Spark 2.0 and I have been trying to get
> dynamic allocation to work without success. I was able to get it to work
> with Spark 16.1 however.
>
> When I issue the command
> spark-shell --master yarn --deploy-mode client
>
> this is the error I see:
>
> 16/08/24 00:05:40 WARN NettyRpcEndpointRef: Error sending message [message
> = RequestExecutors(1,0,Map())] in 1 attempts
> org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120
> seconds]. This timeout is controlled by spark.rpc.askTimeout
> at org.apache.spark.rpc.RpcTimeout.org$apache$spark$
> rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
> at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.
> applyOrElse(RpcTimeout.scala:63)
> at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.
> applyOrElse(RpcTimeout.scala:59)
> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
> at org.apache.spark.rpc.RpcTimeout.awaitResult(
> RpcTimeout.scala:83)
> at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(
> RpcEndpointRef.scala:102)
> at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(
> RpcEndpointRef.scala:78)
> at
> org.apache.spark.scheduler.cluster.YarnSchedulerBackend.
> doRequestTotalExecutors(YarnSchedulerBackend.scala:128)
> at org.apache.spark.scheduler.cluster.
> CoarseGrainedSchedulerBackend.requestTotalExecutors(CoarseGr
> ainedSchedulerBackend.scala:493)
> at org.apache.spark.SparkContext.requestTotalExecutors(
> SparkContext.scala:1482)
> at org.apache.spark.ExecutorAllocationManager.start(
> ExecutorAllocationManager.scala:235)
> at org.apache.spark.SparkContext$$anonfun$21.apply(
> SparkContext.scala:534)
> at org.apache.spark.SparkContext$$anonfun$21.apply(
> SparkContext.scala:534)
> at scala.Option.foreach(Option.scala:257)
> at org.apache.spark.SparkContext.(SparkContext.scala:534)
> at org.apache.spark.SparkContext$.getOrCreate(SparkContext.
> scala:2256)
> at org.apache.spark.sql.SparkSession$Builder$$anonfun$
> 8.apply(SparkSession.scala:831)
> at org.apache.spark.sql.SparkSession$Builder$$anonfun$
> 8.apply(SparkSession.scala:823)
> at scala.Option.getOrElse(Option.scala:121)
> at org.apache.spark.sql.SparkSession$Builder.
> getOrCreate(SparkSession.scala:823)
> at org.apache.spark.repl.Main$.createSparkSession(Main.scala:95)
> at $line3.$read$$iw$$iw.(:15)
> at $line3.$read$$iw.(:31)
> at $line3.$read.(:33)
> at $line3.$read$.(:37)
> at $line3.$read$.()
> at $line3.$eval$.$print$lzycompute(:7)
> at $line3.$eval$.$print(:6)
> at $line3.$eval.$print()
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(
> IMain.scala:786)
> at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(
> IMain.scala:1047)
> at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$
> loadAndRunReq$1.apply(IMain.sca
> la:638)
> at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$
> loadAndRunReq$1.apply(IMain.sca
> la:637)
> at scala.reflect.internal.util.ScalaClassLoader$class.
> asContext(ScalaClassLoader.scala:31)
> at scala.reflect.internal.util.AbstractFileClassLoader.asContext(
> AbstractFileClassLoader.sca
> la:19)
> at scala.tools.nsc.interpreter.IMain$WrappedRequest.
> loadAndRunReq(IMain.scala:637)
> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:569)
> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:565)
> at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(
> ILoop.scala:807)
> at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:681)
> at scala.tools.nsc.interpreter.ILoop.processLine(ILoop.scala:395)
> at org.apache.spark.repl.SparkILoop$$anonfun$
> initializeSpark$1.apply$mcV$sp(SparkILoop.scala
> :38)
> at 

Re: Apache Spark toDebugString producing different output for python and scala repl

2016-08-15 Thread Saisai Shao
The implementation inside the Python API and Scala API for RDD is slightly
different, so the difference of RDD lineage you printed is expected.

On Tue, Aug 16, 2016 at 10:58 AM, DEEPAK SHARMA  wrote:

> Hi All,
>
>
> Below is the small piece of code in scala and python REPL in Apache
> Spark.However I am getting different output in both the language when I
> execute toDebugString.I am using cloudera quick start VM.
>
> PYTHON
>
> rdd2 = sc.textFile('file:/home/training/training_materials/
> data/frostroad.txt').map(lambda x:x.upper()).filter(lambda x : 'THE' in x)
>
> print rdd2.toDebugString()(1) PythonRDD[56] at RDD at PythonRDD.scala:42 []
>  |  file:/home/training/training_materials/data/frostroad.txt 
> MapPartitionsRDD[55] at textFile at NativeMethodAccessorImpl.java:-2 []
>  |  file:/home/training/training_materials/data/frostroad.txt HadoopRDD[54] 
> at textFile at ..
>
> SCALA
>
>  val rdd2 = 
> sc.textFile("file:/home/training/training_materials/data/frostroad.txt").map(x
>  => x.toUpperCase()).filter(x => x.contains("THE"))
>
>
>
> rdd2.toDebugString
> res1: String = (1) MapPartitionsRDD[3] at filter at :21 []
>  |  MapPartitionsRDD[2] at map at :21 []
>  |  file:/home/training/training_materials/data/frostroad.txt 
> MapPartitionsRDD[1] at textFile at :21 []
>  |  file:/home/training/training_materials/data/frostroad.txt HadoopRDD[0] at 
> textFile at <
>
>
> Also one of cloudera slides say that the default partitions  is 2 however
> its 1 (looking at output of toDebugString).
>
>
> Appreciate any help.
>
>
> Thanks
>
> Deepak Sharma
>


Re: submitting spark job with kerberized Hadoop issue

2016-08-07 Thread Saisai Shao
1. Standalone mode doesn't support accessing kerberized Hadoop, simply
because it lacks the mechanism to distribute delegation tokens via cluster
manager.
2. For the HBase token fetching failure, I think you have to do kinit to
generate tgt before start spark application (
http://hbase.apache.org/0.94/book/security.html).

On Mon, Aug 8, 2016 at 12:05 AM, Aneela Saleem 
wrote:

> Thanks Wojciech and Jacek!
>
> I tried with Spark on Yarn with kerberized cluster it works fine now. But
> now when i try to access Hbase through spark i get the following error:
>
> 2016-08-07 20:43:57,617 WARN  
> [hconnection-0x24b5fa45-metaLookup-shared--pool2-t1] ipc.RpcClientImpl: 
> Exception encountered while connecting to the server : 
> javax.security.sasl.SaslException: GSS initiate failed [Caused by 
> GSSException: No valid credentials provided (Mechanism level: Failed to find 
> any Kerberos tgt)]
> 2016-08-07 20:43:57,619 ERROR 
> [hconnection-0x24b5fa45-metaLookup-shared--pool2-t1] ipc.RpcClientImpl: SASL 
> authentication failed. The most likely cause is missing or invalid 
> credentials. Consider 'kinit'.
> javax.security.sasl.SaslException: GSS initiate failed [Caused by 
> GSSException: No valid credentials provided (Mechanism level: Failed to find 
> any Kerberos tgt)]
>   at 
> com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:212)
>   at 
> org.apache.hadoop.hbase.security.HBaseSaslRpcClient.saslConnect(HBaseSaslRpcClient.java:179)
>   at 
> org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupSaslConnection(RpcClientImpl.java:617)
>   at 
> org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.access$700(RpcClientImpl.java:162)
>   at 
> org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$2.run(RpcClientImpl.java:743)
>   at 
> org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$2.run(RpcClientImpl.java:740)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:415)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
>   at 
> org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupIOstreams(RpcClientImpl.java:740)
>   at 
> org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.writeRequest(RpcClientImpl.java:906)
>   at 
> org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.tracedWriteRequest(RpcClientImpl.java:873)
>   at 
> org.apache.hadoop.hbase.ipc.RpcClientImpl.call(RpcClientImpl.java:1241)
>   at 
> org.apache.hadoop.hbase.ipc.AbstractRpcClient.callBlockingMethod(AbstractRpcClient.java:227)
>   at 
> org.apache.hadoop.hbase.ipc.AbstractRpcClient$BlockingRpcChannelImplementation.callBlockingMethod(AbstractRpcClient.java:336)
>   at 
> org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$BlockingStub.scan(ClientProtos.java:34094)
>   at 
> org.apache.hadoop.hbase.client.ClientSmallScanner$SmallScannerCallable.call(ClientSmallScanner.java:201)
>   at 
> org.apache.hadoop.hbase.client.ClientSmallScanner$SmallScannerCallable.call(ClientSmallScanner.java:180)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:210)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas$RetryingRPC.call(ScannerCallableWithReplicas.java:360)
>   at 
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas$RetryingRPC.call(ScannerCallableWithReplicas.java:334)
>   at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:136)
>   at 
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture.run(ResultBoundedCompletionService.java:65)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: GSSException: No valid credentials provided (Mechanism level: 
> Failed to find any Kerberos tgt)
>   at 
> sun.security.jgss.krb5.Krb5InitCredential.getInstance(Krb5InitCredential.java:147)
>   at 
> sun.security.jgss.krb5.Krb5MechFactory.getCredentialElement(Krb5MechFactory.java:121)
>   at 
> sun.security.jgss.krb5.Krb5MechFactory.getMechanismContext(Krb5MechFactory.java:187)
>   at 
> sun.security.jgss.GSSManagerImpl.getMechanismContext(GSSManagerImpl.java:223)
>   at 
> sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:212)
>   at 
> sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:179)
>   at 
> com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:193)
>   ... 25 more
>
>


Re: spark 2.0.0 - how to build an uber-jar?

2016-08-03 Thread Saisai Shao
I guess you're mentioning about spark assembly uber jar. In Spark 2.0,
there's no uber jar, instead there's a jars folder which contains all jars
required in the run-time. For the end user it is transparent, the way to
submit spark application is still the same.

On Wed, Aug 3, 2016 at 4:51 PM, Mich Talebzadeh 
wrote:

> Just to clarify are you building Spark 2 from source downloaded?
>
> Or are you referring to building a uber jar file  using your code and mvn
> to submit with spark-submit etc?
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 3 August 2016 at 09:43, lev  wrote:
>
>> hi,
>>
>> in spark 1.5, to build an uber-jar,
>> I would just compile the code with:
>> mvn ... package
>> and that will create one big jar with all the dependencies.
>>
>> when trying to do the same with spark 2.0, I'm getting a tar.gz file
>> instead.
>>
>> this is the full command I'm using:
>> mvn -Pyarn -Phive -Phadoop-2.6 -Dhadoop.version=2.6.0-cdh5.4.7 -DskipTests
>> -Phive-thriftserver -Dscala-2.10 -Pbigtop-dist clean package
>>
>> how can I create the uber-jar?
>>
>> thanks.
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/spark-2-0-0-how-to-build-an-uber-jar-tp27463.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Re: Spark on yarn, only 1 or 2 vcores getting allocated to the containers getting created.

2016-08-03 Thread Saisai Shao
Use dominant resource calculator instead of default resource calculator
will get the expected vcores as you wanted. Basically by default yarn does
not honor cpu cores as resource, so you will always see vcore is 1 no
matter what number of cores you set in spark.

On Wed, Aug 3, 2016 at 12:11 PM, satyajit vegesna <
satyajit.apas...@gmail.com> wrote:

> Hi All,
>
> I am trying to run a spark job using yarn, and i specify --executor-cores
> value as 20.
> But when i go check the "nodes of the cluster" page in
> http://hostname:8088/cluster/nodes then i see 4 containers getting
> created on each of the node in cluster.
>
> But can only see 1 vcore getting assigned for each containier, even when i
> specify --executor-cores 20 while submitting job using spark-submit.
>
> yarn-site.xml
> 
> yarn.scheduler.maximum-allocation-mb
> 6
> 
> 
> yarn.scheduler.minimum-allocation-vcores
> 1
> 
> 
> yarn.scheduler.maximum-allocation-vcores
> 40
> 
> 
> yarn.nodemanager.resource.memory-mb
> 7
> 
> 
> yarn.nodemanager.resource.cpu-vcores
> 20
> 
>
>
> Did anyone face the same issue??
>
> Regards,
> Satyajit.
>


Re: Getting error, when I do df.show()

2016-08-01 Thread Saisai Shao
>
> java.lang.NoClassDefFoundError: spray/json/JsonReader
>
> at
> com.memsql.spark.pushdown.MemSQLPhysicalRDD$.fromAbstractQueryTree(MemSQLPhysicalRDD.scala:95)
>
> at
> com.memsql.spark.pushdown.MemSQLPushdownStrategy.apply(MemSQLPushdownStrategy.scala:49)
>

Looks like a memsql problem from the stack. Maybe some jars are missing
when you're using memsql datasource.

On Mon, Aug 1, 2016 at 4:22 PM, Subhajit Purkayastha 
wrote:

> I am getting this error in the spark-shell when I do . Which jar file I
> need to download to fix this error?
>
>
>
> Df.show()
>
>
>
> Error
>
>
>
> scala> val df = msc.sql(query)
>
> df: org.apache.spark.sql.DataFrame = [id: int, name: string]
>
>
>
> scala> df.show()
>
> java.lang.NoClassDefFoundError: spray/json/JsonReader
>
> at
> com.memsql.spark.pushdown.MemSQLPhysicalRDD$.fromAbstractQueryTree(MemSQLPhysicalRDD.scala:95)
>
> at
> com.memsql.spark.pushdown.MemSQLPushdownStrategy.apply(MemSQLPushdownStrategy.scala:49)
>
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
>
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
>
> at
> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
>
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
>
> at
> org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:374)
>
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
>
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
>
> at
> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
>
> at
> org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:926)
>
> at
> org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:924)
>
> at
> org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:930)
>
> at
> org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:930)
>
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:53)
>
> at
> org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1903)
>
> at
> org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1384)
>
> at
> org.apache.spark.sql.DataFrame.head(DataFrame.scala:1314)
>
> at
> org.apache.spark.sql.DataFrame.take(DataFrame.scala:1377)
>
> at
> org.apache.spark.sql.DataFrame.showString(DataFrame.scala:178)
>
> at org.apache.spark.sql.DataFrame.show(DataFrame.scala:401)
>
> at org.apache.spark.sql.DataFrame.show(DataFrame.scala:362)
>
> at org.apache.spark.sql.DataFrame.show(DataFrame.scala:370)
>
> at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:48)
>
> at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:53)
>
> at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:55)
>
> at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:57)
>
> at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:59)
>
> at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:61)
>
>
>
>
>


Re: yarn.exceptions.ApplicationAttemptNotFoundException when trying to shut down spark applicaiton via yarn applicaiton --kill

2016-07-26 Thread Saisai Shao
Several useful information can be found here (
https://issues.apache.org/jira/browse/YARN-1842), though personally I
haven't met this problem before.

Thanks
Saisai

On Tue, Jul 26, 2016 at 2:21 PM, Yu Wei  wrote:

> Hi guys,
>
>
> When I tried to shut down spark application via "yarn application --kill".
>
> I run the spark application in yarn cluster mode in my laptop.
>
> I found following exception in log.
>
> org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException: 
> Application attempt appattempt_1469512095518_0002_01 doesn't exist in 
> ApplicationMasterService cache.
>   at 
> org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService.allocate(ApplicationMasterService.java:439)
>   at 
> org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl.allocate(ApplicationMasterProtocolPBServiceImpl.java:60)
>   at 
> org.apache.hadoop.yarn.proto.ApplicationMasterProtocol$ApplicationMasterProtocolService$2.callBlockingMethod(ApplicationMasterProtocol.java:99)
>   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
>   at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969)
>   at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>   at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
>   at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2045)
>
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>   at 
> org.apache.hadoop.yarn.ipc.RPCUtil.instantiateException(RPCUtil.java:53)
>   at 
> org.apache.hadoop.yarn.ipc.RPCUtil.unwrapAndThrowException(RPCUtil.java:101)
>   at 
> org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.allocate(ApplicationMasterProtocolPBClientImpl.java:79)
>   at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>   at com.sun.proxy.$Proxy16.allocate(Unknown Source)
>   at 
> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.allocate(AMRMClientImpl.java:277)
>   at 
> org.apache.spark.deploy.yarn.YarnAllocator.allocateResources(YarnAllocator.scala:225)
>   at 
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$1.run(ApplicationMaster.scala:384)
> Caused by: 
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException):
>  Application attempt appattempt_1469512095518_0002_01 doesn't exist in 
> ApplicationMasterService cache.
>   at 
> org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService.allocate(ApplicationMasterService.java:439)
>   at 
> org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl.allocate(ApplicationMasterProtocolPBServiceImpl.java:60)
>   at 
> org.apache.hadoop.yarn.proto.ApplicationMasterProtocol$ApplicationMasterProtocolService$2.callBlockingMethod(ApplicationMasterProtocol.java:99)
>   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
>   at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969)
>   at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>   at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>
> Any hints?
>
>
> Thanks in advance,
>
>
> Jared, (韦煜)
> Software developer
> Interested in open source software, big data, Linux
>


Re: How to submit app in cluster mode? port 7077 or 6066

2016-07-21 Thread Saisai Shao
I think both 6066 and 7077 can be worked. 6066 is using the REST way to
submit application, while 7077 is the legacy way. From user's aspect, it
should be transparent and no need to worry about the difference.


   - *URL:* spark://hw12100.local:7077
   - *REST URL:* spark://hw12100.local:6066 (cluster mode)


Thanks
Saisai

On Fri, Jul 22, 2016 at 6:44 AM, Andy Davidson <
a...@santacruzintegration.com> wrote:

> I have some very long lived streaming apps. They have been running for
> several months. I wonder if something has changed recently? I first started
> working with spark-1.3 . I am using the stand alone cluster manager. The
> way I would submit my app to run in cluster mode was port 6066
>
>
> Looking at the spark-1.6 it seems like we are supposed to use port 7077
> and the  new argument
>
>
> http://spark.apache.org/docs/latest/submitting-applications.html#launching-applications-with-spark-submit
>
>
>- --deploy-mode: Whether to deploy your driver on the worker nodes (
>cluster) or locally as an external client (client) (default: client)
>*†*
>
>
> Can anyone confirm this. It took me a very long time to figure out how to
> get things to run cluster mode.
>
> Thanks
>
> Andy
>


Re: scala.MatchError on stand-alone cluster mode

2016-07-15 Thread Saisai Shao
The error stack is throwing from your code:

Caused by: scala.MatchError: [Ljava.lang.String;@68d279ec (of class
[Ljava.lang.String;)
at com.jd.deeplog.LogAggregator$.main(LogAggregator.scala:29)
at com.jd.deeplog.LogAggregator.main(LogAggregator.scala)

I think you should debug the code yourself, it may not be the problem of
Spark.

On Fri, Jul 15, 2016 at 3:17 PM, Mekal Zheng  wrote:

> Hi,
>
> I have a Spark Streaming job written in Scala and is running well on local
> and client mode, but when I submit it on cluster mode, the driver reported
> an error shown as below.
> Is there anyone know what is wrong here?
> pls help me!
>
> the Job CODE is after
>
> 16/07/14 17:28:21 DEBUG ByteBufUtil:
> -Dio.netty.threadLocalDirectBufferSize: 65536
> 16/07/14 17:28:21 DEBUG NetUtil: Loopback interface: lo (lo,
> 0:0:0:0:0:0:0:1%lo)
> 16/07/14 17:28:21 DEBUG NetUtil: /proc/sys/net/core/somaxconn: 32768
> 16/07/14 17:28:21 DEBUG TransportServer: Shuffle server started on port
> :43492
> 16/07/14 17:28:21 INFO Utils: Successfully started service 'Driver' on
> port 43492.
> 16/07/14 17:28:21 INFO WorkerWatcher: Connecting to worker spark://
> Worker@172.20.130.98:23933
> 16/07/14 17:28:21 DEBUG TransportClientFactory: Creating new connection to
> /172.20.130.98:23933
> Exception in thread "main" java.lang.reflect.InvocationTargetException
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at
> org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58)
> at
> org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
> Caused by: scala.MatchError: [Ljava.lang.String;@68d279ec (of class
> [Ljava.lang.String;)
> at com.jd.deeplog.LogAggregator$.main(LogAggregator.scala:29)
> at com.jd.deeplog.LogAggregator.main(LogAggregator.scala)
> ... 6 more
>
> ==
> Job CODE:
>
> object LogAggregator {
>
>   val batchDuration = Seconds(5)
>
>   def main(args:Array[String]) {
>
> val usage =
>   """Usage: LogAggregator 
> 
> |  logFormat: fieldName:fieldRole[,fieldName:fieldRole] each field 
> must have both name and role
> |  logFormat.role: can be key|avg|enum|sum|ignore
>   """.stripMargin
>
> if (args.length < 9) {
>   System.err.println(usage)
>   System.exit(1)
> }
>
> val Array(zkQuorum, group, topics, numThreads, logFormat, logSeparator, 
> batchDuration, destType, destPath) = args
>
> println("Start streaming calculation...")
>
> val conf = new SparkConf().setAppName("LBHaproxy-LogAggregator")
> val ssc = new StreamingContext(conf, Seconds(batchDuration.toInt))
>
> val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
>
> val lines = KafkaUtils.createStream(ssc, zkQuorum, group, 
> topicMap).map(_._2)
>
> val logFields = logFormat.split(",").map(field => {
>   val fld = field.split(":")
>   if (fld.size != 2) {
> System.err.println("Wrong parameters for logFormat!\n")
> System.err.println(usage)
> System.exit(1)
>   }
>   // TODO: ensure the field has both 'name' and 'role'
>   new LogField(fld(0), fld(1))
> })
>
> val keyFields = logFields.filter(logFieldName => {
>   logFieldName.role == "key"
> })
> val keys = keyFields.map(key => {
>   key.name
> })
>
> val logsByKey = lines.map(line => {
>   val log = new Log(logFields, line, logSeparator)
>   log.toMap
> }).filter(log => log.nonEmpty).map(log => {
>   val keys = keyFields.map(logField => {
> log(logField.name).value
>   })
>
>   val key = keys.reduce((key1, key2) => {
> key1.asInstanceOf[String] + key2.asInstanceOf[String]
>   })
>
>   val fullLog = log + ("count" -> new LogSegment("sum", 1))
>
>   (key, fullLog)
> })
>
>
> val aggResults = logsByKey.reduceByKey((log_a, log_b) => {
>
>   log_a.map(logField => {
> val logFieldName = logField._1
> val logSegment_a = logField._2
> val logSegment_b = log_b(logFieldName)
>
> val segValue = logSegment_a.role match {
>   case "avg" => {
> logSegment_a.value.toString.toInt + 
> logSegment_b.value.toString.toInt
>   }
>   case "sum" => {
> logSegment_a.value.toString.toInt + 
> logSegment_b.value.toString.toInt
>   }
>   case "enum" => {
> val list_a = logSegment_a.value.asInstanceOf[List[(String, Int)]]
> val list_b = logSegment_b.value.asInstanceOf[List[(String, Int)]]
> list_a ++ list_b
>   }
>   case _ => logSegment_a.value
> }
> 

Re: It seemed JavaDStream.print() did not work when launching via yarn on a single node

2016-07-06 Thread Saisai Shao
DStream.print() will collect some of the data to driver and display, please
see the implementation of DStream.print()

RDD.take() will collect some of the data to driver.

Normally the behavior should be consistent between cluster and local mode,
please find out the root cause of this problem, like MQTT connection or
something else.

def print(num: Int): Unit = ssc.withScope {
  def foreachFunc: (RDD[T], Time) => Unit = {
(rdd: RDD[T], time: Time) => {
  val firstNum = rdd.take(num + 1)
  // scalastyle:off println
  println("---")
  println(s"Time: $time")
  println("---")
  firstNum.take(num).foreach(println)
  if (firstNum.length > num) println("...")
  println()
  // scalastyle:on println
}
  }
  foreachRDD(context.sparkContext.clean(foreachFunc),
displayInnerRDDOps = false)
}


On Wed, Jul 6, 2016 at 9:17 PM, Yu Wei  wrote:

> How about DStream.print().
>
> Does it invoke collect before print on driver?
> --
> *From:* Sean Owen 
> *Sent:* Wednesday, July 6, 2016 8:20:36 PM
> *To:* Rabin Banerjee
> *Cc:* Yu Wei; user@spark.apache.org
> *Subject:* Re: It seemed JavaDStream.print() did not work when launching
> via yarn on a single node
>
> dstream.foreachRDD(_.collect.foreach(println))
>
> On Wed, Jul 6, 2016 at 1:19 PM, Rabin Banerjee
>  wrote:
> > Collect will help then . May be something like this,
> > foreachRDD( rdd => { for(item <- rdd.collect().toArray) { println(item);
> }
> > })
> >
>


Re: spark local dir to HDFS ?

2016-07-05 Thread Saisai Shao
It is not worked to configure local dirs to HDFS. Local dirs are mainly
used for data spill and shuffle data persistence, it is not suitable to use
hdfs. If you met capacity problem, you could configure multiple dirs
located in different mounted disks.

On Wed, Jul 6, 2016 at 9:05 AM, Sri  wrote:

> Hi ,
>
> Space issue  we are currently using /tmp and at the moment we don't have
> any mounted location setup yet.
>
> Thanks
> Sri
>
>
> Sent from my iPhone
>
> On 5 Jul 2016, at 17:22, Jeff Zhang  wrote:
>
> Any reason why you want to set this on hdfs ?
>
> On Tue, Jul 5, 2016 at 3:47 PM, kali.tumm...@gmail.com <
> kali.tumm...@gmail.com> wrote:
>
>> Hi All,
>>
>> can I set spark.local.dir to HDFS location instead of /tmp folder ?
>>
>> I tried setting up temp folder to HDFS but it didn't worked can
>> spark.local.dir write to HDFS ?
>>
>> .set("spark.local.dir","hdfs://namednode/spark_tmp/")
>>
>>
>> 16/07/05 15:35:47 ERROR DiskBlockManager: Failed to create local dir in
>> hdfs://namenode/spark_tmp/. Ignoring this directory.
>> java.io.IOException: Failed to create a temp directory (under
>> hdfs://namenode/spark_tmp/) after 10 attempts!
>>
>>
>> Thanks
>> Sri
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/spark-local-dir-to-HDFS-tp27291.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com
>> .
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>
>
> --
> Best Regards
>
> Jeff Zhang
>
>


Re: deploy-mode flag in spark-sql cli

2016-06-29 Thread Saisai Shao
I think you cannot use sql client in the cluster mode, also for
spark-shell/pyspark which has a repl, all these application can only be
started with client deploy mode.

On Thu, Jun 30, 2016 at 12:46 PM, Mich Talebzadeh  wrote:

> Hi,
>
> When you use spark-shell or for that matter spark-sql, you are staring
> spark-submit under the bonnet. These two shells are created to make life
> easier to work on Spark.
>
>
> However, if you look at what $SPARK_HOME/bin/spark-sql does in the
> script, you will notice my point:
>
>
>
> exec "${SPARK_HOME}"/bin/spark-submit --class
> org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver "$@"
>
>  So that is basically spark-submit JVM
>
>
>
> Since it is using spark-submit it takes all the parameters related to
> spark-submit. You can test it using your own customised shell script with
> parameters passed.
>
>
> ${SPARK_HOME}/bin/spark-submit \
> --driver-memory xG \
> --num-executors n \
> --executor-memory xG \
> --executor-cores m \
> --master yarn \
> --deploy-mode cluster \
>
> --class
> org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver "$@"
>
>
> Does your version of spark support cluster mode?
>
>
> HTH
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 30 June 2016 at 05:16, Huang Meilong  wrote:
>
>> Hello,
>>
>>
>> I added deploy-mode flag in spark-sql cli like this:
>>
>> $ spark-sql --deploy-mode cluster --master yarn -e "select * from mx"
>>
>>
>> It showed error saying "Cluster deploy mode is not applicable to Spark
>> SQL shell", but "spark-sql --help" shows "--deploy-mode" option. Is this
>> a bug?
>>
>
>


Re: problem running spark with yarn-client not using spark-submit

2016-06-26 Thread Saisai Shao
It means several jars are missing in the yarn container environment, if you
want to submit your application through some other ways besides
spark-submit, you have to take care all the environment things yourself.
Since we don't know your implementation of java web service, so it is hard
to provide useful information. Basically the problem is spark jar is
missing in the AM side, so you have to put spark jars into distributed
cache ahead of time.

On Mon, Jun 27, 2016 at 11:19 AM,  wrote:

> Hi,
>
> Thanks for reply. It's a java web service resides in a jboss container.
>
> HY Chung
> Best regards,
>
> S.Y. Chung 鍾學毅
> F14MITD
> Taiwan Semiconductor Manufacturing Company, Ltd.
> Tel: 06-5056688 Ext: 734-6325
>
>
> |->
> |Mich Talebzadeh  |
> | |m>   |
> | |
> | |
> | |
> |2016/06/24 下午 08:05|
> |->
>
> >-|
>   |
>  |
>   |
>  |
>   |
>To|
>   |sychu...@tsmc.com
>   |
>   |
>cc|
>   |"user @spark" 
>   |
>   |
>   Subject|
>   |[Spam][SMG] Re: problem running spark with yarn-client not
> using spark-submit
>   |
>   |
>  |
>   |
>  |
>   |
>  |
>   |
>  |
>   |
>  |
>
> >-|
>
>
>
>
> Hi,
> Trying to run spark with yarn-client not using spark-submit here
>
> what are you using to submit the job? spark-shell, spark-sql  or anything
> else
>
>
> Dr Mich Talebzadeh
>
> LinkedIn
>
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>
> http://talebzadehmich.wordpress.com
>
>
>
>
> On 24 June 2016 at 12:01,  wrote:
>
>   Hello guys,
>
>   Trying to run spark with yarn-client not using spark-submit here but the
>   jobs kept failed while AM launching executor.
>   The error collected by yarn like below.
>   Looks like some environment setting is missing?
>   Could someone help me out with this.
>
>   Thanks  in advance!
>   HY Chung
>
>   Java HotSpot(TM) 64-Bit Server VM warning: ignoring option
>   MaxPermSize=256m; support was removed in 8.0
>   Exception in thread "main" java.lang.NoClassDefFoundError:
>   org/apache/spark/Logging
>at java.lang.ClassLoader.defineClass1(Native Method)
>at java.lang.ClassLoader.defineClass
>   (ClassLoader.java:760)
>at java.security.SecureClassLoader.defineClass
>   (SecureClassLoader.java:142)
>at java.net.URLClassLoader.defineClass
>   (URLClassLoader.java:455)
>at java.net.URLClassLoader.access$100
>   (URLClassLoader.java:73)
>at java.net.URLClassLoader$1.run
>   (URLClassLoader.java:367)
>at java.net.URLClassLoader$1.run
>   (URLClassLoader.java:361)
>at java.security.AccessController.doPrivileged(Native
>   Method)
>at java.net.URLClassLoader.findClass
>   (URLClassLoader.java:360)
>at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>at sun.misc.Launcher$AppClassLoader.loadClass
>   (Launcher.java:308)
>at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>at org.apache.spark.deploy.yarn.ExecutorLauncher$.main
>   (ApplicationMaster.scala:674)
>at org.apache.spark.deploy.yarn.ExecutorLauncher.main
>   (ApplicationMaster.scala)
>   Caused by: java.lang.ClassNotFoundException: org.apache.spark.Logging
>at java.net.URLClassLoader$1.run
>   (URLClassLoader.java:372)
>at java.net.URLClassLoader$1.run
>   (URLClassLoader.java:361)
>  

Re: Could not find or load main class org.apache.spark.deploy.yarn.ExecutorLauncher

2016-06-22 Thread Saisai Shao
spark.yarn.jar (none) The location of the Spark jar file, in case
overriding the default location is desired. By default, Spark on YARN will
use a Spark jar installed locally, but the Spark jar can also be in a
world-readable location on HDFS. This allows YARN to cache it on nodes so
that it doesn't need to be distributed each time an application runs. To
point to a jar on HDFS, for example, set this configuration to
hdfs:///some/path.

spark.yarn.jar is used for spark run-time system jar, which is spark
assembly jar, not the application jar (example-assembly jar). So in your
case you upload the example-assembly jar into hdfs, in which spark system
jars are not packed, so ExecutorLaucher cannot be found.

Thanks
Saisai

On Wed, Jun 22, 2016 at 2:10 PM, 另一片天 <958943...@qq.com> wrote:

> shihj@master:/usr/local/spark/spark-1.6.1-bin-hadoop2.6$
> ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master
> yarn-client --driver-memory 512m --num-executors 2 --executor-memory 512m
> --executor-cores 2
> /user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar 10
> Warning: Local jar
> /user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar does not exist,
> skipping.
> java.lang.ClassNotFoundException: org.apache.spark.examples.SparkPi
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at org.apache.spark.util.Utils$.classForName(Utils.scala:174)
> at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:689)
> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> get error at once
> -- 原始邮件 --
> *发件人:* "Yash Sharma";;
> *发送时间:* 2016年6月22日(星期三) 下午2:04
> *收件人:* "另一片天"<958943...@qq.com>;
> *抄送:* "user";
> *主题:* Re: Could not find or load main class
> org.apache.spark.deploy.yarn.ExecutorLauncher
>
> How about supplying the jar directly in spark submit -
>
> ./bin/spark-submit \
>> --class org.apache.spark.examples.SparkPi \
>> --master yarn-client \
>> --driver-memory 512m \
>> --num-executors 2 \
>> --executor-memory 512m \
>> --executor-cores 2 \
>> /user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar
>
>
> On Wed, Jun 22, 2016 at 3:59 PM, 另一片天 <958943...@qq.com> wrote:
>
>> i  config this  para  at spark-defaults.conf
>> spark.yarn.jar
>> hdfs://master:9000/user/shihj/spark_lib/spark-examples-1.6.1-hadoop2.6.0.jar
>>
>> then ./bin/spark-submit --class org.apache.spark.examples.SparkPi
>> --master yarn-client --driver-memory 512m --num-executors 2
>> --executor-memory 512m --executor-cores 210:
>>
>>
>>
>>- Error: Could not find or load main class
>>org.apache.spark.deploy.yarn.ExecutorLauncher
>>
>> but  i don't config that para ,there no error  why???that para is only
>> avoid Uploading resource file(jar package)??
>>
>
>


YARN Application Timeline service with Spark 2.0.0 issue

2016-06-17 Thread Saisai Shao
Hi Community,

In Spark 2.0.0 we upgrade to use jersey2 (
https://issues.apache.org/jira/browse/SPARK-12154) instead of jersey 1.9,
while for the whole Hadoop we still stick on the old version. This will
bring in some issues when yarn timeline service is enabled (
https://issues.apache.org/jira/browse/SPARK-15343). For any Spark2
application running on yarn with timeline service enabled will be failed.

Just a heads up if you happened to run into this issue you could disable
yarn timeline service through configuration
"spark.hadoop.yarn.timeline-service.enabled
= false" to disable spark on yarn to use this feature. Also we will fix
this in the yarn side.

Thanks
Saisai


Re: Map tuple to case class in Dataset

2016-05-31 Thread Saisai Shao
It works fine in my local test, I'm using latest master, maybe this bug is
already fixed.

On Wed, Jun 1, 2016 at 7:29 AM, Michael Armbrust 
wrote:

> Version of Spark? What is the exception?
>
> On Tue, May 31, 2016 at 4:17 PM, Tim Gautier 
> wrote:
>
>> How should I go about mapping from say a Dataset[(Int,Int)] to a
>> Dataset[]?
>>
>> I tried to use a map, but it throws exceptions:
>>
>> case class Test(a: Int)
>> Seq(1,2).toDS.map(t => Test(t)).show
>>
>> Thanks,
>> Tim
>>
>
>


Re: duplicate jar problem in yarn-cluster mode

2016-05-17 Thread Saisai Shao
I think it is already fixed if your problem is exactly the same as what
mentioned in this JIRA (https://issues.apache.org/jira/browse/SPARK-14423).

Thanks
Jerry

On Wed, May 18, 2016 at 2:46 AM, satish saley 
wrote:

> Hello,
> I am executing a simple code with yarn-cluster
>
> --master
> yarn-cluster
> --name
> Spark-FileCopy
> --class
> my.example.SparkFileCopy
> --properties-file
> spark-defaults.conf
> --queue
> saleyq
> --executor-memory
> 1G
> --driver-memory
> 1G
> --conf
> spark.john.snow.is.back=true
> --jars
> hdfs://myclusternn.com:8020/tmp/saley/examples/examples-new.jar
> --conf
> spark.executor.extraClassPath=examples-new.jar
> --conf
> spark.driver.extraClassPath=examples-new.jar
> --verbose
> examples-new.jar
> hdfs://myclusternn.com:8020/tmp/saley/examples/input-data/text/data.txt
> hdfs://myclusternn.com:8020/tmp/saley/examples/output-data/spark
>
>
> I am facing
>
> Resource hdfs://
> myclusternn.com/user/saley/.sparkStaging/application_5181/examples-new.jar
> changed on src filesystem (expected 1463440119942, was 1463440119989
> java.io.IOException: Resource hdfs://
> myclusternn.com/user/saley/.sparkStaging/application_5181/examples-new.jar
> changed on src filesystem (expected 1463440119942, was 1463440119989
>
> I see a jira for this
> https://issues.apache.org/jira/browse/SPARK-1921
>
> Is this yet to be fixed or fixed as part of another jira and need some
> additional config?
>


Re: Re: How to change output mode to Update

2016-05-17 Thread Saisai Shao
> .mode(SaveMode.Overwrite)

>From my understanding mode is not supported in continuous query.

def mode(saveMode: SaveMode): DataFrameWriter = {
  // mode() is used for non-continuous queries
  // outputMode() is used for continuous queries
  assertNotStreaming("mode() can only be called on non-continuous queries")
  this.mode = saveMode
  this
}


On Wed, May 18, 2016 at 12:25 PM, Todd  wrote:

> Thanks Ted.
>
> I didn't try, but I think SaveMode and OuputMode are different things.
> Currently, the spark code contain two output mode, Append and Update.
> Append is the default mode,but looks that there is no way to change to
> Update.
>
> Take a look at DataFrameWriter#startQuery
>
> Thanks.
>
>
>
>
>
>
> At 2016-05-18 12:10:11, "Ted Yu"  wrote:
>
> Have you tried adding:
>
> .mode(SaveMode.Overwrite)
>
> On Tue, May 17, 2016 at 8:55 PM, Todd  wrote:
>
>> scala> records.groupBy("name").count().write.trigger(ProcessingTime("30
>> seconds")).option("checkpointLocation",
>> "file:///home/hadoop/jsoncheckpoint").startStream("file:///home/hadoop/jsonresult")
>> org.apache.spark.sql.AnalysisException: Aggregations are not supported on
>> streaming DataFrames/Datasets in Append output mode. Consider changing
>> output mode to Update.;
>>   at
>> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:142)
>>   at
>> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForStreaming$1.apply(UnsupportedOperationChecker.scala:59)
>>   at
>> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForStreaming$1.apply(UnsupportedOperationChecker.scala:46)
>>   at
>> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:125)
>>   at
>> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:46)
>>   at
>> org.apache.spark.sql.ContinuousQueryManager.startQuery(ContinuousQueryManager.scala:190)
>>   at
>> org.apache.spark.sql.DataFrameWriter.startStream(DataFrameWriter.scala:351)
>>   at
>> org.apache.spark.sql.DataFrameWriter.startStream(DataFrameWriter.scala:279)
>>
>>
>> I brief the spark code, looks like there is no way to change output mode
>> to Update?
>>
>
>


Re: How to use Kafka as data source for Structured Streaming

2016-05-17 Thread Saisai Shao
It is not supported now, currently only filestream is supported.

Thanks
Jerry

On Wed, May 18, 2016 at 10:14 AM, Todd  wrote:

> Hi,
> I am wondering whether structured streaming supports Kafka as data source.
> I brief the source code(meanly related with DataSourceRegister trait), and
> didn't find kafka data source things
> If
> Thanks.
>
>
>
>
>


Re: Re: spark uploading resource error

2016-05-10 Thread Saisai Shao
The code is in Client.scala under yarn sub-module (see the below link).
Maybe you need to check the vendor version about their changes to the
Apache Spark code.

https://github.com/apache/spark/blob/branch-1.3/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala

Thanks
Saisai

On Tue, May 10, 2016 at 4:17 PM, 朱旻 <z...@126.com> wrote:

>
>
> it was a product sold by huawei . name is FusionInsight. it says spark was
> 1.3 with hadoop 2.7.1
>
> where can i find the code or config file which define the files to be
> uploaded?
>
>
> At 2016-05-10 16:06:05, "Saisai Shao" <sai.sai.s...@gmail.com> wrote:
>
> What is the version of Spark are you using? From my understanding, there's
> no code in yarn#client will upload "__hadoop_conf__" into distributed cache.
>
>
>
> On Tue, May 10, 2016 at 3:51 PM, 朱旻 <z...@126.com> wrote:
>
>> hi all:
>> I found a problem using spark .
>> WHEN I use spark-submit to launch a task. it works
>>
>> *spark-submit --num-executors 8 --executor-memory 8G --class
>> com.icbc.nss.spark.PfsjnlSplit  --master yarn-cluster
>> /home/nssbatch/nss_schedual/jar/SparkBigtableJoinSqlJava.jar
>> /user/nss/nss-20151018-pfsjnl-004_024.txt /user/nss/output_join
>> /user/nss/output_join2*
>>
>> but when i use the command created by spark-class  as below
>>
>> */home/nssbatch/huaweiclient/hadoopclient/JDK/jdk/bin/java
>> -Djava.security.krb5.conf=/home/nssbatch/huaweiclient/hadoopclient/KrbClient/kerberos/var/krb5kdc/krb5.conf
>> -Dzookeeper.server.principal=zookeeper/hadoop.hadoop.com
>> <http://hadoop.hadoop.com>
>> -Djava.security.auth.login.config=/home/nssbatch/huaweiclient/hadoopclient/Spark/adapter/client/controller/jaas.conf
>> -Dzookeeper.kinit=/home/nssbatch/huaweiclient/hadoopclient/KrbClient/kerberos/bin/kinit
>> -cp
>> /home/nssbatch/huaweiclient/hadoopclient/Spark/spark/conf/:/home/nssbatch/huaweiclient/hadoopclient/Spark/spark/lib/spark-assembly-1.3.0-hadoop2.7.1.jar:/home/nssbatch/huaweiclient/hadoopclient/Spark/spark/lib/datanucleus-core-3.2.10.jar:/home/nssbatch/huaweiclient/hadoopclient/Spark/spark/lib/datanucleus-rdbms-3.2.9.jar:/home/nssbatch/huaweiclient/hadoopclient/Spark/spark/lib/datanucleus-api-jdo-3.2.6.jar:/home/nssbatch/huaweiclient/hadoopclient/Spark/spark/conf/:/home/nssbatch/huaweiclient/hadoopclient/Yarn/config/
>> org.apache.spark.deploy.SparkSubmit --master yarn-cluster --class
>> com.icbc.nss.spark.PfsjnlSplit --num-executors 8 --executor-memory 8G
>> /home/nssbatch/nss_schedual/jar/SparkBigtableJoinSqlJava.jar
>> /user/nss/nss-20151018-pfsjnl-004_024.txt /user/nss/output_join
>> /user/nss/output_join2*
>>
>> it didn't work.
>> i compare the log.and found that:
>>
>> 16/05/10 22:23:24 INFO Client: Uploading resource
>> file:/tmp/spark-a4457754-7183-44ce-bd0d-32a071757c92/__hadoop_conf__4372868703234608846.zip
>> ->
>> hdfs://hacluster/user/nss/.sparkStaging/application_1462442311990_0057/__hadoop_conf__4372868703234608846.zip
>>
>>
>> the conf_file uploaded into hdfs was different.
>>
>> why is this happened?
>> where can i find the resource file to be uploading?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>
>
>
>
>


Re: spark uploading resource error

2016-05-10 Thread Saisai Shao
What is the version of Spark are you using? From my understanding, there's
no code in yarn#client will upload "__hadoop_conf__" into distributed cache.



On Tue, May 10, 2016 at 3:51 PM, 朱旻  wrote:

> hi all:
> I found a problem using spark .
> WHEN I use spark-submit to launch a task. it works
>
> *spark-submit --num-executors 8 --executor-memory 8G --class
> com.icbc.nss.spark.PfsjnlSplit  --master yarn-cluster
> /home/nssbatch/nss_schedual/jar/SparkBigtableJoinSqlJava.jar
> /user/nss/nss-20151018-pfsjnl-004_024.txt /user/nss/output_join
> /user/nss/output_join2*
>
> but when i use the command created by spark-class  as below
>
> */home/nssbatch/huaweiclient/hadoopclient/JDK/jdk/bin/java
> -Djava.security.krb5.conf=/home/nssbatch/huaweiclient/hadoopclient/KrbClient/kerberos/var/krb5kdc/krb5.conf
> -Dzookeeper.server.principal=zookeeper/hadoop.hadoop.com
> 
> -Djava.security.auth.login.config=/home/nssbatch/huaweiclient/hadoopclient/Spark/adapter/client/controller/jaas.conf
> -Dzookeeper.kinit=/home/nssbatch/huaweiclient/hadoopclient/KrbClient/kerberos/bin/kinit
> -cp
> /home/nssbatch/huaweiclient/hadoopclient/Spark/spark/conf/:/home/nssbatch/huaweiclient/hadoopclient/Spark/spark/lib/spark-assembly-1.3.0-hadoop2.7.1.jar:/home/nssbatch/huaweiclient/hadoopclient/Spark/spark/lib/datanucleus-core-3.2.10.jar:/home/nssbatch/huaweiclient/hadoopclient/Spark/spark/lib/datanucleus-rdbms-3.2.9.jar:/home/nssbatch/huaweiclient/hadoopclient/Spark/spark/lib/datanucleus-api-jdo-3.2.6.jar:/home/nssbatch/huaweiclient/hadoopclient/Spark/spark/conf/:/home/nssbatch/huaweiclient/hadoopclient/Yarn/config/
> org.apache.spark.deploy.SparkSubmit --master yarn-cluster --class
> com.icbc.nss.spark.PfsjnlSplit --num-executors 8 --executor-memory 8G
> /home/nssbatch/nss_schedual/jar/SparkBigtableJoinSqlJava.jar
> /user/nss/nss-20151018-pfsjnl-004_024.txt /user/nss/output_join
> /user/nss/output_join2*
>
> it didn't work.
> i compare the log.and found that:
>
> 16/05/10 22:23:24 INFO Client: Uploading resource
> file:/tmp/spark-a4457754-7183-44ce-bd0d-32a071757c92/__hadoop_conf__4372868703234608846.zip
> ->
> hdfs://hacluster/user/nss/.sparkStaging/application_1462442311990_0057/__hadoop_conf__4372868703234608846.zip
>
>
> the conf_file uploaded into hdfs was different.
>
> why is this happened?
> where can i find the resource file to be uploading?
>
>
>
>
>
>
>
>
>
>
>
>


Re: Re: How big the spark stream window could be ?

2016-05-09 Thread Saisai Shao
Pease see the inline comments.


On Mon, May 9, 2016 at 5:31 PM, Ashok Kumar <ashok34...@yahoo.com> wrote:

> Thank you.
>
> So If I create spark streaming then
>
>
>1. The streams will always need to be cached? It cannot be stored in
>persistent storage
>
> You don't need to cache the stream explicitly if you don't have specific
requirement, Spark will do it for you depends on different streaming
sources (Kafka or socket).

>
>1. The stream data cached will be distributed among all nodes of Spark
>among executors
>2. As I understand each Spark worker node has one executor that
>includes cache. So the streaming data is distributed among these work node
>caches. For example if I have 4 worker nodes each cache will have a quarter
>of data (this assumes that cache size among worker nodes is the same.)
>
> Ideally, it will distributed evenly across the executors, also this is
target for tuning. Normally it depends on several conditions like receiver
distribution, partition distribution.


>
> The issue raises if the amount of streaming data does not fit into these 4
> caches? Will the job crash?
>
>
> On Monday, 9 May 2016, 10:16, Saisai Shao <sai.sai.s...@gmail.com> wrote:
>
>
> No, each executor only stores part of data in memory (it depends on how
> the partition are distributed and how many receivers you have).
>
> For WindowedDStream, it will obviously cache the data in memory, from my
> understanding you don't need to call cache() again.
>
> On Mon, May 9, 2016 at 5:06 PM, Ashok Kumar <ashok34...@yahoo.com> wrote:
>
> hi,
>
> so if i have 10gb of streaming data coming in does it require 10gb of
> memory in each node?
>
> also in that case why do we need using
>
> dstream.cache()
>
> thanks
>
>
> On Monday, 9 May 2016, 9:58, Saisai Shao <sai.sai.s...@gmail.com> wrote:
>
>
> It depends on you to write the Spark application, normally if data is
> already on the persistent storage, there's no need to be put into memory.
> The reason why Spark Streaming has to be stored in memory is that streaming
> source is not persistent source, so you need to have a place to store the
> data.
>
> On Mon, May 9, 2016 at 4:43 PM, 李明伟 <kramer2...@126.com> wrote:
>
> Thanks.
> What if I use batch calculation instead of stream computing? Do I still
> need that much memory? For example, if the 24 hour data set is 100 GB. Do I
> also need a 100GB RAM to do the one time batch calculation ?
>
>
>
>
>
> At 2016-05-09 15:14:47, "Saisai Shao" <sai.sai.s...@gmail.com> wrote:
>
> For window related operators, Spark Streaming will cache the data into
> memory within this window, in your case your window size is up to 24 hours,
> which means data has to be in Executor's memory for more than 1 day, this
> may introduce several problems when memory is not enough.
>
> On Mon, May 9, 2016 at 3:01 PM, Mich Talebzadeh <mich.talebza...@gmail.com
> > wrote:
>
> ok terms for Spark Streaming
>
> "Batch interval" is the basic interval at which the system with receive
> the data in batches.
> This is the interval set when creating a StreamingContext. For example, if
> you set the batch interval as 300 seconds, then any input DStream will
> generate RDDs of received data at 300 seconds intervals.
> A window operator is defined by two parameters -
> - WindowDuration / WindowsLength - the length of the window
> - SlideDuration / SlidingInterval - the interval at which the window will
> slide or move forward
>
>
> Ok so your batch interval is 5 minutes. That is the rate messages are
> coming in from the source.
>
> Then you have these two params
>
> // window length - The duration of the window below that must be multiple
> of batch interval n in = > StreamingContext(sparkConf, Seconds(n))
> val windowLength = x =  m * n
> // sliding interval - The interval at which the window operation is
> performed in other words data is collected within this "previous interval'
> val slidingInterval =  y l x/y = even number
>
> Both the window length and the slidingInterval duration must be multiples
> of the batch interval, as received data is divided into batches of duration
> "batch interval".
>
> If you want to collect 1 hour data then windowLength =  12 * 5 * 60
> seconds
> If you want to collect 24 hour data then windowLength =  24 * 12 * 5 * 60
>
> You sliding window should be set to batch interval = 5 * 60 seconds. In
> other words that where the aggregates and summaries come for your report.
>
> What is your data source here?
>
> HTH
>
>
> Dr Mich Talebzadeh
>
> LinkedIn * 
> https:

Re: Re: How big the spark stream window could be ?

2016-05-09 Thread Saisai Shao
No, each executor only stores part of data in memory (it depends on how the
partition are distributed and how many receivers you have).

For WindowedDStream, it will obviously cache the data in memory, from my
understanding you don't need to call cache() again.

On Mon, May 9, 2016 at 5:06 PM, Ashok Kumar <ashok34...@yahoo.com> wrote:

> hi,
>
> so if i have 10gb of streaming data coming in does it require 10gb of
> memory in each node?
>
> also in that case why do we need using
>
> dstream.cache()
>
> thanks
>
>
> On Monday, 9 May 2016, 9:58, Saisai Shao <sai.sai.s...@gmail.com> wrote:
>
>
> It depends on you to write the Spark application, normally if data is
> already on the persistent storage, there's no need to be put into memory.
> The reason why Spark Streaming has to be stored in memory is that streaming
> source is not persistent source, so you need to have a place to store the
> data.
>
> On Mon, May 9, 2016 at 4:43 PM, 李明伟 <kramer2...@126.com> wrote:
>
> Thanks.
> What if I use batch calculation instead of stream computing? Do I still
> need that much memory? For example, if the 24 hour data set is 100 GB. Do I
> also need a 100GB RAM to do the one time batch calculation ?
>
>
>
>
>
> At 2016-05-09 15:14:47, "Saisai Shao" <sai.sai.s...@gmail.com> wrote:
>
> For window related operators, Spark Streaming will cache the data into
> memory within this window, in your case your window size is up to 24 hours,
> which means data has to be in Executor's memory for more than 1 day, this
> may introduce several problems when memory is not enough.
>
> On Mon, May 9, 2016 at 3:01 PM, Mich Talebzadeh <mich.talebza...@gmail.com
> > wrote:
>
> ok terms for Spark Streaming
>
> "Batch interval" is the basic interval at which the system with receive
> the data in batches.
> This is the interval set when creating a StreamingContext. For example, if
> you set the batch interval as 300 seconds, then any input DStream will
> generate RDDs of received data at 300 seconds intervals.
> A window operator is defined by two parameters -
> - WindowDuration / WindowsLength - the length of the window
> - SlideDuration / SlidingInterval - the interval at which the window will
> slide or move forward
>
>
> Ok so your batch interval is 5 minutes. That is the rate messages are
> coming in from the source.
>
> Then you have these two params
>
> // window length - The duration of the window below that must be multiple
> of batch interval n in = > StreamingContext(sparkConf, Seconds(n))
> val windowLength = x =  m * n
> // sliding interval - The interval at which the window operation is
> performed in other words data is collected within this "previous interval'
> val slidingInterval =  y l x/y = even number
>
> Both the window length and the slidingInterval duration must be multiples
> of the batch interval, as received data is divided into batches of duration
> "batch interval".
>
> If you want to collect 1 hour data then windowLength =  12 * 5 * 60
> seconds
> If you want to collect 24 hour data then windowLength =  24 * 12 * 5 * 60
>
> You sliding window should be set to batch interval = 5 * 60 seconds. In
> other words that where the aggregates and summaries come for your report.
>
> What is your data source here?
>
> HTH
>
>
> Dr Mich Talebzadeh
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
> http://talebzadehmich.wordpress.com
>
>
> On 9 May 2016 at 04:19, kramer2...@126.com <kramer2...@126.com> wrote:
>
> We have some stream data need to be calculated and considering use spark
> stream to do it.
>
> We need to generate three kinds of reports. The reports are based on
>
> 1. The last 5 minutes data
> 2. The last 1 hour data
> 3. The last 24 hour data
>
> The frequency of reports is 5 minutes.
>
> After reading the docs, the most obvious way to solve this seems to set up
> a
> spark stream with 5 minutes interval and two window which are 1 hour and 1
> day.
>
>
> But I am worrying that if the window is too big for one day and one hour. I
> do not have much experience on spark stream, so what is the window length
> in
> your environment?
>
> Any official docs talking about this?
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-big-the-spark-stream-window-could-be-tp26899.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>
>
>
>
>
>
>
>
>


Re: Re: How big the spark stream window could be ?

2016-05-09 Thread Saisai Shao
It depends on you to write the Spark application, normally if data is
already on the persistent storage, there's no need to be put into memory.
The reason why Spark Streaming has to be stored in memory is that streaming
source is not persistent source, so you need to have a place to store the
data.

On Mon, May 9, 2016 at 4:43 PM, 李明伟 <kramer2...@126.com> wrote:

> Thanks.
> What if I use batch calculation instead of stream computing? Do I still
> need that much memory? For example, if the 24 hour data set is 100 GB. Do I
> also need a 100GB RAM to do the one time batch calculation ?
>
>
>
>
>
> At 2016-05-09 15:14:47, "Saisai Shao" <sai.sai.s...@gmail.com> wrote:
>
> For window related operators, Spark Streaming will cache the data into
> memory within this window, in your case your window size is up to 24 hours,
> which means data has to be in Executor's memory for more than 1 day, this
> may introduce several problems when memory is not enough.
>
> On Mon, May 9, 2016 at 3:01 PM, Mich Talebzadeh <mich.talebza...@gmail.com
> > wrote:
>
>> ok terms for Spark Streaming
>>
>> "Batch interval" is the basic interval at which the system with receive
>> the data in batches.
>> This is the interval set when creating a StreamingContext. For example,
>> if you set the batch interval as 300 seconds, then any input DStream will
>> generate RDDs of received data at 300 seconds intervals.
>> A window operator is defined by two parameters -
>> - WindowDuration / WindowsLength - the length of the window
>> - SlideDuration / SlidingInterval - the interval at which the window will
>> slide or move forward
>>
>>
>> Ok so your batch interval is 5 minutes. That is the rate messages are
>> coming in from the source.
>>
>> Then you have these two params
>>
>> // window length - The duration of the window below that must be multiple
>> of batch interval n in = > StreamingContext(sparkConf, Seconds(n))
>> val windowLength = x =  m * n
>> // sliding interval - The interval at which the window operation is
>> performed in other words data is collected within this "previous interval'
>> val slidingInterval =  y l x/y = even number
>>
>> Both the window length and the slidingInterval duration must be multiples
>> of the batch interval, as received data is divided into batches of duration
>> "batch interval".
>>
>> If you want to collect 1 hour data then windowLength =  12 * 5 * 60
>> seconds
>> If you want to collect 24 hour data then windowLength =  24 * 12 * 5 * 60
>>
>> You sliding window should be set to batch interval = 5 * 60 seconds. In
>> other words that where the aggregates and summaries come for your report.
>>
>> What is your data source here?
>>
>> HTH
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 9 May 2016 at 04:19, kramer2...@126.com <kramer2...@126.com> wrote:
>>
>>> We have some stream data need to be calculated and considering use spark
>>> stream to do it.
>>>
>>> We need to generate three kinds of reports. The reports are based on
>>>
>>> 1. The last 5 minutes data
>>> 2. The last 1 hour data
>>> 3. The last 24 hour data
>>>
>>> The frequency of reports is 5 minutes.
>>>
>>> After reading the docs, the most obvious way to solve this seems to set
>>> up a
>>> spark stream with 5 minutes interval and two window which are 1 hour and
>>> 1
>>> day.
>>>
>>>
>>> But I am worrying that if the window is too big for one day and one
>>> hour. I
>>> do not have much experience on spark stream, so what is the window
>>> length in
>>> your environment?
>>>
>>> Any official docs talking about this?
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-big-the-spark-stream-window-could-be-tp26899.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>
>
>
>


Re: How big the spark stream window could be ?

2016-05-09 Thread Saisai Shao
What do you mean of swap space, the system swap space or Spark's block
manager disk space?

If you're referring to swap space, I think you should first think about JVM
heap size and yarn container size before running out of system memory.

If you're referring to block manager disk space, the StorageLevel of
WindowedDStream is MEMORY_ONLY_SER, so it will not put into disk when
executor memory is not enough.


On Mon, May 9, 2016 at 3:26 PM, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> That is a valid point Shao. However, it will start using disk space as
> memory storage akin to swap space. It will not crash I believe it will just
> be slow and this assumes that you do not run out of disk space.
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 9 May 2016 at 08:14, Saisai Shao <sai.sai.s...@gmail.com> wrote:
>
>> For window related operators, Spark Streaming will cache the data into
>> memory within this window, in your case your window size is up to 24 hours,
>> which means data has to be in Executor's memory for more than 1 day, this
>> may introduce several problems when memory is not enough.
>>
>> On Mon, May 9, 2016 at 3:01 PM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> ok terms for Spark Streaming
>>>
>>> "Batch interval" is the basic interval at which the system with receive
>>> the data in batches.
>>> This is the interval set when creating a StreamingContext. For example,
>>> if you set the batch interval as 300 seconds, then any input DStream will
>>> generate RDDs of received data at 300 seconds intervals.
>>> A window operator is defined by two parameters -
>>> - WindowDuration / WindowsLength - the length of the window
>>> - SlideDuration / SlidingInterval - the interval at which the window
>>> will slide or move forward
>>>
>>>
>>> Ok so your batch interval is 5 minutes. That is the rate messages are
>>> coming in from the source.
>>>
>>> Then you have these two params
>>>
>>> // window length - The duration of the window below that must be
>>> multiple of batch interval n in = > StreamingContext(sparkConf, Seconds(n))
>>> val windowLength = x =  m * n
>>> // sliding interval - The interval at which the window operation is
>>> performed in other words data is collected within this "previous interval'
>>> val slidingInterval =  y l x/y = even number
>>>
>>> Both the window length and the slidingInterval duration must be
>>> multiples of the batch interval, as received data is divided into batches
>>> of duration "batch interval".
>>>
>>> If you want to collect 1 hour data then windowLength =  12 * 5 * 60
>>> seconds
>>> If you want to collect 24 hour data then windowLength =  24 * 12 * 5 *
>>> 60
>>>
>>> You sliding window should be set to batch interval = 5 * 60 seconds. In
>>> other words that where the aggregates and summaries come for your report.
>>>
>>> What is your data source here?
>>>
>>> HTH
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 9 May 2016 at 04:19, kramer2...@126.com <kramer2...@126.com> wrote:
>>>
>>>> We have some stream data need to be calculated and considering use spark
>>>> stream to do it.
>>>>
>>>> We need to generate three kinds of reports. The reports are based on
>>>>
>>>> 1. The last 5 minutes data
>>>> 2. The last 1 hour data
>>>> 3. The last 24 hour data
>>>>
>>>> The frequency of reports is 5 minutes.
>>>>
>>>> After reading the docs, the most obvious way to solve this seems to set
>>>> up a
>>>> spark stream with 5 minutes interval and two window which are 1 hour
>>>> and 1
>>>> day.
>>>>
>>>>
>>>> But I am worrying that if the window is too big for one day and one
>>>> hour. I
>>>> do not have much experience on spark stream, so what is the window
>>>> length in
>>>> your environment?
>>>>
>>>> Any official docs talking about this?
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-big-the-spark-stream-window-could-be-tp26899.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> -
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>
>>>>
>>>
>>
>


Re: How big the spark stream window could be ?

2016-05-09 Thread Saisai Shao
For window related operators, Spark Streaming will cache the data into
memory within this window, in your case your window size is up to 24 hours,
which means data has to be in Executor's memory for more than 1 day, this
may introduce several problems when memory is not enough.

On Mon, May 9, 2016 at 3:01 PM, Mich Talebzadeh 
wrote:

> ok terms for Spark Streaming
>
> "Batch interval" is the basic interval at which the system with receive
> the data in batches.
> This is the interval set when creating a StreamingContext. For example, if
> you set the batch interval as 300 seconds, then any input DStream will
> generate RDDs of received data at 300 seconds intervals.
> A window operator is defined by two parameters -
> - WindowDuration / WindowsLength - the length of the window
> - SlideDuration / SlidingInterval - the interval at which the window will
> slide or move forward
>
>
> Ok so your batch interval is 5 minutes. That is the rate messages are
> coming in from the source.
>
> Then you have these two params
>
> // window length - The duration of the window below that must be multiple
> of batch interval n in = > StreamingContext(sparkConf, Seconds(n))
> val windowLength = x =  m * n
> // sliding interval - The interval at which the window operation is
> performed in other words data is collected within this "previous interval'
> val slidingInterval =  y l x/y = even number
>
> Both the window length and the slidingInterval duration must be multiples
> of the batch interval, as received data is divided into batches of duration
> "batch interval".
>
> If you want to collect 1 hour data then windowLength =  12 * 5 * 60
> seconds
> If you want to collect 24 hour data then windowLength =  24 * 12 * 5 * 60
>
> You sliding window should be set to batch interval = 5 * 60 seconds. In
> other words that where the aggregates and summaries come for your report.
>
> What is your data source here?
>
> HTH
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 9 May 2016 at 04:19, kramer2...@126.com  wrote:
>
>> We have some stream data need to be calculated and considering use spark
>> stream to do it.
>>
>> We need to generate three kinds of reports. The reports are based on
>>
>> 1. The last 5 minutes data
>> 2. The last 1 hour data
>> 3. The last 24 hour data
>>
>> The frequency of reports is 5 minutes.
>>
>> After reading the docs, the most obvious way to solve this seems to set
>> up a
>> spark stream with 5 minutes interval and two window which are 1 hour and 1
>> day.
>>
>>
>> But I am worrying that if the window is too big for one day and one hour.
>> I
>> do not have much experience on spark stream, so what is the window length
>> in
>> your environment?
>>
>> Any official docs talking about this?
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-big-the-spark-stream-window-could-be-tp26899.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Fw: Significant performance difference for same spark job in scala vs pyspark

2016-05-05 Thread Saisai Shao
Writing RDD based application using pyspark will bring in additional
overheads, Spark is running on the JVM whereas your python code is running
on python runtime, so data should be communicated between JVM world and
python world, this requires additional serialization-deserialization, IPC.
Also other parts will bring in overheads. So the performance difference is
expected, but you could tune the application to reduce the gap.

Also because python RDD wraps a lot, so the DAG you saw is different from
Scala, that is also expected.

Thanks
Saisai


On Fri, May 6, 2016 at 12:47 PM, pratik gawande 
wrote:

> Hello,
>
> I am new to spark. For one of  job I am finding significant performance
> difference when run in pyspark vs scala. Could you please let me know if
> this is known and scala is preferred over python for writing spark jobs?
> Also DAG visualization shows completely different DAGs for scala and
> pyspark. I have pasted DAG for both using toDebugString() method. Let me
> know if you need any additional information.
>
> *Time for Job in scala* : 52 secs
>
> *Time for job in pyspark *: 4.2 min
>
>
> *Scala code in Zepplin:*
>
> val lines = sc.textFile("s3://[test-bucket]/output2/")
> val words = lines.flatMap(line => line.split(" "))
> val filteredWords = words.filter(word => word.equals("Gutenberg") ||
> word.equals("flower") || word.equals("a"))
> val wordMap = filteredWords.map(word => (word, 1)).reduceByKey(_ + _)
> wordMap.collect()
>
> *pyspark code in Zepplin:*
>
> lines = sc.textFile("s3://[test-bucket]/output2/")
> words = lines.flatMap(lambda x: x.split())
> filteredWords = words.filter(lambda x: (x == "Gutenberg" or x == "flower"
> or x == "a"))
> result = filteredWords.map(lambda x: (x, 1)).reduceByKey(lambda a,b:
> a+b).collect()
> print result
>
> *Scala final RDD:*
>
>
> *print wordMap.toDebugString() *
>
>  lines: org.apache.spark.rdd.RDD[String] = s3://[test-bucket]/output2/
> MapPartitionsRDD[108] at textFile at :30 words:
> org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[109] at flatMap at
> :31 filteredWords: org.apache.spark.rdd.RDD[String] =
> MapPartitionsRDD[110] at filter at :33 wordMap:
> org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[112] at reduceByKey
> at :35 (10) ShuffledRDD[112] at reduceByKey at :35 []
> +-(10) MapPartitionsRDD[111] at map at :35 [] |
> MapPartitionsRDD[110] at filter at :33 [] | MapPartitionsRDD[109]
> at flatMap at :31 [] | s3://[test-bucket]/output2/
> MapPartitionsRDD[108] at textFile at :30 [] | s3://[test-bucket]/
> output2/ HadoopRDD[107] at textFile at :30 []
>
>
> *PySpark final RDD:*
>
>
> *println(wordMap.toDebugString) *
>
> (10) PythonRDD[119] at RDD at PythonRDD.scala:43 [] | s3://[test-bucket]/
> output2/ MapPartitionsRDD[114] at textFile at null:-1 [] |
> s3://[test-bucket]/output2/HadoopRDD[113] at textFile at null:-1 []
> PythonRDD[120] at RDD at PythonRDD.scala:43
>
>
> Thanks,
>
> Pratik
>


Re: kafka direct streaming python API fromOffsets

2016-05-03 Thread Saisai Shao
I guess the problem is that py4j automatically translate the python int
into java int or long according to the value of the data. If this value is
small it will translate to java int, otherwise it will translate into java
long.

But in java code, the parameter must be long type, so that's the exception
you met.

AFAIK, if you're using python 2, you could specify long type like 123L or
long(123), so this data will be specifically translated into java long.
If you're using python 3, which has no long type, currently I'm sure if
there's a workaround about it.

You could refer to python kafka unit test to see the details of using
python api.

Thanks
Saisai



On Tue, May 3, 2016 at 4:11 PM, Tigran Avanesov <
tigran.avane...@olamobile.com> wrote:

> Thank you,
>
> But now I have this error:
>
> java.lang.ClassCastException: java.lang.Integer cannot be cast to
> java.lang.Long
>
> My offsets are actually not big enough to be long. If I put bigger values,
> I have no such exception.
> For me looks like a bug.
>
> Any ideas for a workaround?
>
> Thank!
>
>
> On 05/02/2016 06:57 PM, Cody Koeninger wrote:
>
>> If you're confused about the type of an argument, you're probably
>> better off looking at documentation that includes static types:
>>
>>
>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$
>>
>> createDirectStream's fromOffsets parameter takes a map from
>> TopicAndPartition to Long.
>>
>> There is documentation for a python constructor for TopicAndPartition:
>>
>>
>> http://spark.apache.org/docs/latest/api/python/_modules/pyspark/streaming/kafka.html#TopicAndPartition
>>
>>
>> On Mon, May 2, 2016 at 5:54 AM, Tigran Avanesov
>>  wrote:
>>
>>> Hi,
>>>
>>> I'm trying to start consuming messages from a kafka topic (via direct
>>> stream) from a given offset.
>>> The documentation of createDirectStream says:
>>>
>>> :param fromOffsets: Per-topic/partition Kafka offsets defining the
>>> (inclusive) starting
>>> point of the stream.
>>>
>>> However it expects a dictionary of topics (not names...), as i tried to
>>> feed
>>> it something like { 'topic' : {0: 123, 1:234}}, and of course got an
>>> exception.
>>> How should I build this fromOffsets parameter?
>>>
>>> Documentation does not say anything about it.
>>> (In general, I think it would be better if the function accepted topic
>>> names)
>>>
>>> Thank you!
>>>
>>> Regards,
>>> Tigran
>>>
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
> --
>
> Tigran Avanesov | IT Architect
> phone: +352 261911 3562
> email: tigran.avane...@olamobile.com
> skype: tigran.avanesov.corporate
> post:  Olamobile S.à.r.l.
>2-4 rue Eugène Ruppert
>Bâtiment Vertigo-Polaris
>L-2453 Luxembourg
>Luxembourg
> web:   www.olamobile.com
>
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Detecting application restart when running in supervised cluster mode

2016-04-05 Thread Saisai Shao
Hi Deepak,

I don't think supervise can be worked with yarn, it is a standalone and
Mesos specific feature.

Thanks
Saisai

On Tue, Apr 5, 2016 at 3:23 PM, Deepak Sharma  wrote:

> Hi Rafael
> If you are using yarn as the engine , you can always use RM UI to see the
> application progress.
>
> Thanks
> Deepak
>
> On Tue, Apr 5, 2016 at 12:18 PM, Rafael Barreto  > wrote:
>
>> Hello,
>>
>> I have a driver deployed using `spark-submit` in supervised cluster mode.
>> Sometimes my application would die for some transient problem and the
>> restart works perfectly. However, it would be useful to get alerted when
>> that happens. Is there any out-of-the-box way of doing that? Perhaps a hook
>> that I can use to catch an event? I guess I could poll my application state
>> using Spark REST API, but if there was something more elegant, I would
>> rather use it.
>>
>> Thanks in advance,
>> Rafael Barreto
>>
>
>
>
> --
> Thanks
> Deepak
> www.bigdatabig.com
> www.keosha.net
>


Re: --packages configuration equivalent item name?

2016-04-04 Thread Saisai Shao
spark.jars.ivy, spark.jars.packages, spark.jars.excludes is the
configurations you can use.

Thanks
Saisai

On Sun, Apr 3, 2016 at 1:59 AM, Russell Jurney 
wrote:

> Thanks, Andy!
>
> On Mon, Mar 28, 2016 at 8:44 AM, Andy Davidson <
> a...@santacruzintegration.com> wrote:
>
>> Hi Russell
>>
>> I use Jupyter python notebooks a lot. Here is how I start the server
>>
>> set -x # turn debugging on
>>
>> #set +x # turn debugging off
>>
>>
>> # https://github.com/databricks/spark-csv
>>
>> # http://spark-packages.org/package/datastax/spark-cassandra-connector
>>
>> #
>> https://github.com/datastax/spark-cassandra-connector/blob/master/doc/15_python.md
>>
>> #
>> https://github.com/datastax/spark-cassandra-connector/blob/master/doc/15_python.md#pyspark-with-data-frames
>>
>>
>> # packages are ',' seperate with no white space
>>
>> extraPkgs="--packages
>> com.databricks:spark-csv_2.11:1.3.0,datastax:spark-cassandra-connector:1.6.0-M1-s_2.10"
>>
>>
>> export PYSPARK_PYTHON=python3
>>
>> export PYSPARK_DRIVER_PYTHON=python3
>>
>> IPYTHON_OPTS=notebook $SPARK_ROOT/bin/pyspark $extraPkgs --conf
>> spark.cassandra.connection.host=
>> ec2-54-153-102-232.us-west-1.compute.amazonaws.com $*
>>
>>
>>
>> From: Russell Jurney 
>> Date: Sunday, March 27, 2016 at 7:22 PM
>> To: "user @spark" 
>> Subject: --packages configuration equivalent item name?
>>
>> I run PySpark with CSV support like so: IPYTHON=1 pyspark --packages
>> com.databricks:spark-csv_2.10:1.4.0
>>
>> I don't want to type this --packages argument each time. Is there a
>> config item for --packages? I can't find one in the reference at
>> http://spark.apache.org/docs/latest/configuration.html
>>
>> If there is no way to do this, please let me know so I can make a JIRA
>> for this feature.
>>
>> Thanks!
>> --
>> Russell Jurney twitter.com/rjurney russell.jur...@gmail.com relato.io
>>
>>
>
>
> --
> Russell Jurney twitter.com/rjurney russell.jur...@gmail.com relato.io
>


Re: Eliminating shuffle write and spill disk IO reads/writes in Spark

2016-04-01 Thread Saisai Shao
So I think ramdisk is simple way to try.

Besides I think Reynold's suggestion is quite valid, with such high-end
machine, putting everything in memory might not improve the performance a
lot as assumed. Since bottleneck will be shifted, like memory bandwidth,
NUMA, CPU efficiency (serialization-deserialization, data processing...).
Also code design should well consider such usage scenario, to use resource
more efficiently.

Thanks
Saisai

On Sat, Apr 2, 2016 at 7:27 AM, Michael Slavitch <slavi...@gmail.com> wrote:

> Yes we see it on final write.  Our preference is to eliminate this.
>
>
> On Fri, Apr 1, 2016, 7:25 PM Saisai Shao <sai.sai.s...@gmail.com> wrote:
>
>> Hi Michael, shuffle data (mapper output) have to be materialized into
>> disk finally, no matter how large memory you have, it is the design purpose
>> of Spark. In you scenario, since you have a big memory, shuffle spill
>> should not happen frequently, most of the disk IO you see might be final
>> shuffle file write.
>>
>> So if you want to avoid this disk IO, you could use ramdisk as Reynold
>> suggested. If you want to avoid FS overhead of ramdisk, you could try to
>> hack a new shuffle implementation, since shuffle framework is pluggable.
>>
>>
>> On Sat, Apr 2, 2016 at 6:48 AM, Michael Slavitch <slavi...@gmail.com>
>> wrote:
>>
>>> As I mentioned earlier this flag is now ignored.
>>>
>>>
>>> On Fri, Apr 1, 2016, 6:39 PM Michael Slavitch <slavi...@gmail.com>
>>> wrote:
>>>
>>>> Shuffling a 1tb set of keys and values (aka sort by key)  results in
>>>> about 500gb of io to disk if compression is enabled. Is there any way to
>>>> eliminate shuffling causing io?
>>>>
>>>> On Fri, Apr 1, 2016, 6:32 PM Reynold Xin <r...@databricks.com> wrote:
>>>>
>>>>> Michael - I'm not sure if you actually read my email, but spill has
>>>>> nothing to do with the shuffle files on disk. It was for the partitioning
>>>>> (i.e. sorting) process. If that flag is off, Spark will just run out of
>>>>> memory when data doesn't fit in memory.
>>>>>
>>>>>
>>>>> On Fri, Apr 1, 2016 at 3:28 PM, Michael Slavitch <slavi...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> RAMdisk is a fine interim step but there is a lot of layers
>>>>>> eliminated by keeping things in memory unless there is need for 
>>>>>> spillover.
>>>>>>   At one time there was support for turning off spilling.  That was
>>>>>> eliminated.  Why?
>>>>>>
>>>>>>
>>>>>> On Fri, Apr 1, 2016, 6:05 PM Mridul Muralidharan <mri...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I think Reynold's suggestion of using ram disk would be a good way to
>>>>>>> test if these are the bottlenecks or something else is.
>>>>>>> For most practical purposes, pointing local dir to ramdisk should
>>>>>>> effectively give you 'similar' performance as shuffling from memory.
>>>>>>>
>>>>>>> Are there concerns with taking that approach to test ? (I dont see
>>>>>>> any, but I am not sure if I missed something).
>>>>>>>
>>>>>>>
>>>>>>> Regards,
>>>>>>> Mridul
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Apr 1, 2016 at 2:10 PM, Michael Slavitch <slavi...@gmail.com>
>>>>>>> wrote:
>>>>>>> > I totally disagree that it’s not a problem.
>>>>>>> >
>>>>>>> > - Network fetch throughput on 40G Ethernet exceeds the throughput
>>>>>>> of NVME
>>>>>>> > drives.
>>>>>>> > - What Spark is depending on is Linux’s IO cache as an effective
>>>>>>> buffer pool
>>>>>>> > This is fine for small jobs but not for jobs with datasets in the
>>>>>>> TB/node
>>>>>>> > range.
>>>>>>> > - On larger jobs flushing the cache causes Linux to block.
>>>>>>> > - On a modern 56-hyperthread 2-socket host the latency caused by
>>>>>>> multiple
>>>>>>> > executors writing out to disk increases greatly.
>>>>>>> &

Re: Eliminating shuffle write and spill disk IO reads/writes in Spark

2016-04-01 Thread Saisai Shao
Hi Michael, shuffle data (mapper output) have to be materialized into disk
finally, no matter how large memory you have, it is the design purpose of
Spark. In you scenario, since you have a big memory, shuffle spill should
not happen frequently, most of the disk IO you see might be final shuffle
file write.

So if you want to avoid this disk IO, you could use ramdisk as Reynold
suggested. If you want to avoid FS overhead of ramdisk, you could try to
hack a new shuffle implementation, since shuffle framework is pluggable.


On Sat, Apr 2, 2016 at 6:48 AM, Michael Slavitch  wrote:

> As I mentioned earlier this flag is now ignored.
>
>
> On Fri, Apr 1, 2016, 6:39 PM Michael Slavitch  wrote:
>
>> Shuffling a 1tb set of keys and values (aka sort by key)  results in
>> about 500gb of io to disk if compression is enabled. Is there any way to
>> eliminate shuffling causing io?
>>
>> On Fri, Apr 1, 2016, 6:32 PM Reynold Xin  wrote:
>>
>>> Michael - I'm not sure if you actually read my email, but spill has
>>> nothing to do with the shuffle files on disk. It was for the partitioning
>>> (i.e. sorting) process. If that flag is off, Spark will just run out of
>>> memory when data doesn't fit in memory.
>>>
>>>
>>> On Fri, Apr 1, 2016 at 3:28 PM, Michael Slavitch 
>>> wrote:
>>>
 RAMdisk is a fine interim step but there is a lot of layers eliminated
 by keeping things in memory unless there is need for spillover.   At one
 time there was support for turning off spilling.  That was eliminated.
 Why?


 On Fri, Apr 1, 2016, 6:05 PM Mridul Muralidharan 
 wrote:

> I think Reynold's suggestion of using ram disk would be a good way to
> test if these are the bottlenecks or something else is.
> For most practical purposes, pointing local dir to ramdisk should
> effectively give you 'similar' performance as shuffling from memory.
>
> Are there concerns with taking that approach to test ? (I dont see
> any, but I am not sure if I missed something).
>
>
> Regards,
> Mridul
>
>
>
>
> On Fri, Apr 1, 2016 at 2:10 PM, Michael Slavitch 
> wrote:
> > I totally disagree that it’s not a problem.
> >
> > - Network fetch throughput on 40G Ethernet exceeds the throughput of
> NVME
> > drives.
> > - What Spark is depending on is Linux’s IO cache as an effective
> buffer pool
> > This is fine for small jobs but not for jobs with datasets in the
> TB/node
> > range.
> > - On larger jobs flushing the cache causes Linux to block.
> > - On a modern 56-hyperthread 2-socket host the latency caused by
> multiple
> > executors writing out to disk increases greatly.
> >
> > I thought the whole point of Spark was in-memory computing?  It’s in
> fact
> > in-memory for some things but  use spark.local.dir as a buffer pool
> of
> > others.
> >
> > Hence, the performance of  Spark is gated by the performance of
> > spark.local.dir, even on large memory systems.
> >
> > "Currently it is not possible to not write shuffle files to disk.”
> >
> > What changes >would< make it possible?
> >
> > The only one that seems possible is to clone the shuffle service and
> make it
> > in-memory.
> >
> >
> >
> >
> >
> > On Apr 1, 2016, at 4:57 PM, Reynold Xin  wrote:
> >
> > spark.shuffle.spill actually has nothing to do with whether we write
> shuffle
> > files to disk. Currently it is not possible to not write shuffle
> files to
> > disk, and typically it is not a problem because the network fetch
> throughput
> > is lower than what disks can sustain. In most cases, especially with
> SSDs,
> > there is little difference between putting all of those in memory
> and on
> > disk.
> >
> > However, it is becoming more common to run Spark on a few number of
> beefy
> > nodes (e.g. 2 nodes each with 1TB of RAM). We do want to look into
> improving
> > performance for those. Meantime, you can setup local ramdisks on
> each node
> > for shuffle writes.
> >
> >
> >
> > On Fri, Apr 1, 2016 at 11:32 AM, Michael Slavitch <
> slavi...@gmail.com>
> > wrote:
> >>
> >> Hello;
> >>
> >> I’m working on spark with very large memory systems (2TB+) and
> notice that
> >> Spark spills to disk in shuffle.  Is there a way to force spark to
> stay in
> >> memory when doing shuffle operations?   The goal is to keep the
> shuffle data
> >> either in the heap or in off-heap memory (in 1.6.x) and never touch
> the IO
> >> subsystem.  I am willing to have the job fail if it runs out of RAM.
> >>
> >> spark.shuffle.spill true  is deprecated in 1.6 and does 

Re: Spark Metrics : Why is the Sink class declared private[spark] ?

2016-04-01 Thread Saisai Shao
There's a JIRA (https://issues.apache.org/jira/browse/SPARK-14151) about
it, please take a look.

Thanks
Saisai

On Sat, Apr 2, 2016 at 6:48 AM, Walid Lezzar  wrote:

> Hi,
>
> I looked into the spark code at how spark report metrics using the
> MetricsSystem class. I've seen that the spark MetricsSystem class when
> instantiated parses the metrics.properties file, tries to find the sinks
> class name and load them dinamically. It would be great to implement my own
> sink by inheriting from the org.apache.spark.metrics.sinks.Sink class but
> unfortunately, this class has been declared private[spark] ! So it is not
> possible to inverit from it ! Why is that ? Is this gonna change in future
> spark versions ?
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Re: Is there a way to submit spark job to your by YARN REST API?

2016-03-22 Thread Saisai Shao
My question is this ACL control is provided by Yarn or you have an in-house
facility to handle this?

If you're referring to this #ContainerLaunchContext#setApplicationACLs, I
think current Spark on Yarn doesn't support this. From feature level, this
is doable in the current yarn/client code, no need to use REST API.


 setApplicationACLs
<https://hadoop.apache.org/docs/stable2/api/src-html/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.html#line.197>
(Map
<http://docs.oracle.com/javase/7/docs/api/java/util/Map.html?is-external=true>
https://hadoop.apache.org/docs/stable2/api/org/apache/hadoop/yarn/api/records/ApplicationAccessType.html>
,String
<http://docs.oracle.com/javase/7/docs/api/java/lang/String.html?is-external=true>
> acls)



On Tue, Mar 22, 2016 at 10:28 PM, tony@tendcloud.com <
tony@tendcloud.com> wrote:

> Hi, Saisai,
> Thanks a lot for your reply. We want to have a way which we can control
> the user who submit spark jobs with program so that we can have security
> control on our data safety. So is there any good way for that?
>
> --
> 阎志涛(Tony)
>
> 北京腾云天下科技有限公司
> -
> ---
> 邮箱:tony@tendcloud.com
> 电话:13911815695
> 微信: zhitao_yan
> QQ : 4707059
> 地址:北京市东城区东直门外大街39号院2号楼航空服务大厦602室
> 邮编:100027
>
> ----
> TalkingData.com <http://talkingdata.com/> - 让数据说话
>
>
> *From:* Saisai Shao <sai.sai.s...@gmail.com>
> *Date:* 2016-03-22 18:03
> *To:* tony@tendcloud.com
> *CC:* user <user@spark.apache.org>
> *Subject:* Re: Is there a way to submit spark job to your by YARN REST
> API?
> I'm afraid currently it is not supported by Spark to submit application
> through Yarn REST API. However Yarn AMRMClient is functionally equal to
> REST API, not sure which specific features are you referring?
>
> Thanks
> Saisai
>
> On Tue, Mar 22, 2016 at 5:27 PM, tony@tendcloud.com <
> tony@tendcloud.com> wrote:
>
>> Hi, All,
>> We are trying to build a data processing workflow which will call
>> different spark jobs and we are using YARN. Because we want to constraint
>> ACL for those spark jobs, so we need to submit spark job to use Yarn REST
>> API( which we can pass application acl as parameters. So is there any Spark
>> API which can support that?   If no, is there any third party solutions for
>> that?
>>
>>
>> Thanks and Regards,
>>
>>
>> --
>> 阎志涛(Tony)
>>
>> 北京腾云天下科技有限公司
>> -
>> ---
>> 邮箱:tony@tendcloud.com
>> 电话:13911815695
>> 微信: zhitao_yan
>> QQ : 4707059
>> 地址:北京市东城区东直门外大街39号院2号楼航空服务大厦602室
>> 邮编:100027
>>
>> 
>> TalkingData.com <http://talkingdata.com/> - 让数据说话
>>
>
>


Re: Is there a way to submit spark job to your by YARN REST API?

2016-03-22 Thread Saisai Shao
I'm afraid currently it is not supported by Spark to submit application
through Yarn REST API. However Yarn AMRMClient is functionally equal to
REST API, not sure which specific features are you referring?

Thanks
Saisai

On Tue, Mar 22, 2016 at 5:27 PM, tony@tendcloud.com <
tony@tendcloud.com> wrote:

> Hi, All,
> We are trying to build a data processing workflow which will call
> different spark jobs and we are using YARN. Because we want to constraint
> ACL for those spark jobs, so we need to submit spark job to use Yarn REST
> API( which we can pass application acl as parameters. So is there any Spark
> API which can support that?   If no, is there any third party solutions for
> that?
>
>
> Thanks and Regards,
>
>
> --
> 阎志涛(Tony)
>
> 北京腾云天下科技有限公司
> -
> ---
> 邮箱:tony@tendcloud.com
> 电话:13911815695
> 微信: zhitao_yan
> QQ : 4707059
> 地址:北京市东城区东直门外大街39号院2号楼航空服务大厦602室
> 邮编:100027
>
> 
> TalkingData.com  - 让数据说话
>


Re: Issues facing while Running Spark Streaming Job in YARN cluster mode

2016-03-22 Thread Saisai Shao
I guess in local mode you're using local FS instead of HDFS, here the
exception mainly threw from HDFS when running on Yarn, I think it would be
better to check the status and configurations of HDFS to see if it normal
or not.

Thanks
Saisai

On Tue, Mar 22, 2016 at 5:46 PM, Soni spark 
wrote:

> Hi ,
>
> I am able to run spark streaming job in local mode, when i try to run the
> same job in my YARN cluster, its throwing errors.
>
> Any help is appreciated in this regard
>
> Here are my Exception logs:
>
> Exception 1:
>
> java.net.SocketTimeoutException: 48 millis timeout while waiting for
> channel to be ready for write. ch :
> java.nio.channels.SocketChannel[connected local=/172.16.28.192:50010
> remote=/172.16.28.193:46147]
> at
> org.apache.hadoop.net.SocketIOWithTimeout.waitForIO(SocketIOWithTimeout.java:246)
> at
> org.apache.hadoop.net.SocketOutputStream.waitForWritable(SocketOutputStream.java:172)
> at
> org.apache.hadoop.net.SocketOutputStream.transferToFully(SocketOutputStream.java:220)
> at
> org.apache.hadoop.hdfs.server.datanode.BlockSender.sendPacket(BlockSender.java:559)
> at
> org.apache.hadoop.hdfs.server.datanode.BlockSender.sendBlock(BlockSender.java:728)
> at
> org.apache.hadoop.hdfs.server.datanode.DataXceiver.readBlock(DataXceiver.java:496)
> at
> org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opReadBlock(Receiver.java:116)
> at
> org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:71)
> at
> org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:235)
> at java.lang.Thread.run(Thread.java:745)
>
>
> Exception 2:
>
>
> 2016-03-22 12:17:47,838 WARN org.apache.hadoop.hdfs.BlockReaderFactory:
> I/O error constructing remote block reader.
> java.nio.channels.ClosedByInterruptException
> at
> java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
> at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:658)
> at
> org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:192)
> at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:530)
> at
> org.apache.hadoop.hdfs.DFSClient.newConnectedPeer(DFSClient.java:3101)
> at
> org.apache.hadoop.hdfs.BlockReaderFactory.nextTcpPeer(BlockReaderFactory.java:755)
> at
> org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReaderFromTcp(BlockReaderFactory.java:670)
> at
> org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:337)
> at
> org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:576)
> at
> org.apache.hadoop.hdfs.DFSInputStream.seekToBlockSource(DFSInputStream.java:1460)
> at
> org.apache.hadoop.hdfs.DFSInputStream.readBuffer(DFSInputStream.java:773)
> at
> org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:806)
> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:847)
> at java.io.DataInputStream.read(DataInputStream.java:100)
> at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:84)
> at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:52)
> at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:112)
> at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:366)
> at org.apache.hadoop.yarn.util.FSDownload.copy(FSDownload.java:265)
> at
> org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:61)
> at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:359)
> at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:357)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
> at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:356)
> at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:60)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> 2016-03-22 12:17:47,838 INFO
> org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container:
> Container container_1458629096860_0001_01_01 transitioned from KILLING
> to DONE
> 2016-03-22 12:17:47,841 INFO
> org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application:
> Removing container_1458629096860_0001_01_01 from application
> application_1458629096860_0001
> 2016-03-22 12:17:47,842 INFO
> org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices: Got
> event CONTAINER_STOP for appId 

Re: Enabling spark_shuffle service without restarting YARN Node Manager

2016-03-16 Thread Saisai Shao
If you want to avoid existing job failure while restarting NM, you could
enable work preserving for NM, in this case, the restart of NM will not
affect the running containers (containers can still run). That could
alleviate NM restart problem.

Thanks
Saisai

On Wed, Mar 16, 2016 at 6:30 PM, Alex Dzhagriev  wrote:

> Hi Vinay,
>
> I believe it's not possible as the spark-shuffle code should run in the
> same JVM process as the Node Manager. I haven't heard anything about on the
> fly bytecode loading in the Node Manger.
>
> Thanks, Alex.
>
> On Wed, Mar 16, 2016 at 10:12 AM, Vinay Kashyap 
> wrote:
>
>> Hi all,
>>
>> I am using *Spark 1.5.1* in *yarn-client* mode along with *CDH 5.5*
>>
>> As per the documentation to enable Dynamic Allocation of Executors in
>> Spark,
>> it is required to add the shuffle service jar to YARN Node Manager's
>> classpath and restart the YARN Node Manager.
>>
>> Is there any way to to dynamically supply the shuffle service jar
>> information from the application itself and avoid disturbing the running
>> YARN service.
>>
>> Tried couple of options by uploading the jar to hdfs and set
>> *yarn.application.classpath* but did not work. On container launch for
>> the executor it fails to recognize the shuffle service.
>>
>> Any help would be greatly appreciated.
>>
>> --
>> *Thanks and regards*
>> *Vinay Kashyap*
>>
>
>


Re: Job failed while submitting python to yarn programatically

2016-03-15 Thread Saisai Shao
You cannot directly invoke Spark application by using yarn#client like what
you mentioned, it is deprecated and not supported. you have to use
spark-submit to submit a Spark application to yarn.

Also here the specific problem is that you're invoking yarn#client to run
spark app as yarn-client mode (by default), in which AM expected that
driver is already started,  but here apparently not, so AM will throw such
exception.

Anyway, this way of submitting spark application is not a supported way for
now, please refer to the docs for spark-submit.

Thanks
Saisai

On Wed, Mar 16, 2016 at 11:05 AM, Jeff Zhang  wrote:

> Could you try yarn-cluster mode ? Make sure your cluster nodes can reach
> your client machine and no firewall.
>
> On Wed, Mar 16, 2016 at 10:54 AM,  wrote:
>
>>
>> Hi all,
>>
>> We're trying to submit a python file, pi.py in this case, to yarn from
>> java
>> code but this kept failing(1.6.0).
>> It seems the AM uses the arguments we passed to pi.py as the driver IP
>> address.
>> Could someone help me figuring out how to get the job done. Thanks in
>> advance.
>>
>> The java code looks like below:
>>
>>   String[] args = new String[]{
>> "--name",
>> "Test Submit Python To Yarn From Java",
>> "--primary-py-file",
>> SPARK_HOME + "/examples/src/main/python/pi.py",
>> "--num-executors",
>> "5",
>> "--driver-memory",
>> "512m",
>> "--executor-memory",
>> "512m",
>> "--executor-cores",
>> "1",
>> "--arg",
>> args[0]
>> };
>>
>> Configuration config = new Configuration();
>> SparkConf sparkConf = new SparkConf();
>> ClientArguments clientArgs = new ClientArguments(args,
>> sparkConf
>> );
>> Client client = new Client(clientArgs, config, sparkConf);
>> client.run();
>>
>>
>> The jar is submitted by spark-submit::
>> ./bin/spark-submit --class SubmitPyYARNJobFromJava --master yarn-client
>> TestSubmitPythonFromJava.jar 10
>>
>>
>> The job submit to yarn just stay in ACCEPTED before it failed
>> What I can't figure out is, yarn log shows AM couldn't reach the driver at
>> 10:0, which is my argument passed to pi.py
>>
>> SLF4J: Class path contains multiple SLF4J bindings.
>> SLF4J: Found binding in
>>
>> [jar:file:/data/1/yarn/local/usercache/root/filecache/2084/spark-assembly-1.6.0-hadoop2.6.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>
>> SLF4J: Found binding in
>>
>> [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>
>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>> explanation.
>> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
>> 16/03/15 17:54:44 INFO yarn.ApplicationMaster: Registered signal handlers
>> for [TERM, HUP, INT]
>> 16/03/15 17:54:45 INFO yarn.ApplicationMaster: ApplicationAttemptId:
>> appattempt_1458023046377_0499_01
>> 16/03/15 17:54:45 INFO spark.SecurityManager: Changing view acls to:
>> yarn,root
>> 16/03/15 17:54:45 INFO spark.SecurityManager: Changing modify acls to:
>> yarn,root
>> 16/03/15 17:54:45 INFO spark.SecurityManager: SecurityManager:
>> authentication disabled; ui acls disabled; users with view permissions:
>> Set
>> (yarn, root); users with modify permissions: Set(yarn, root)
>> 16/03/15 17:54:45 INFO yarn.ApplicationMaster: Waiting for Spark driver to
>> be reachable.
>> 16/03/15 17:54:45 ERROR yarn.ApplicationMaster: Failed to connect to
>> driver
>> at 10:0, retrying ...
>> 16/03/15 17:54:46 ERROR yarn.ApplicationMaster: Failed to connect to
>> driver
>> at 10:0, retrying ...
>> 16/03/15 17:54:46 ERROR yarn.ApplicationMaster: Failed to connect to
>> driver
>> at 10:0, retrying ...
>> .
>> 16/03/15 17:56:25 ERROR yarn.ApplicationMaster: Failed to connect to
>> driver
>> at 10:0, retrying ...
>> 16/03/15 17:56:26 ERROR yarn.ApplicationMaster: Uncaught exception:
>> org.apache.spark.SparkException: Failed to connect to driver!
>>  at
>> org.apache.spark.deploy.yarn.ApplicationMaster.waitForSparkDriver
>> (ApplicationMaster.scala:484)
>>  at
>> org.apache.spark.deploy.yarn.ApplicationMaster.runExecutorLauncher
>> (ApplicationMaster.scala:345)
>>  at org.apache.spark.deploy.yarn.ApplicationMaster.run
>> (ApplicationMaster.scala:187)
>>  at
>> org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun
>> $main$1.apply$mcV$sp(ApplicationMaster.scala:653)
>>  at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run
>> (SparkHadoopUtil.scala:69)
>>  at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run
>> (SparkHadoopUtil.scala:68)
>>  at java.security.AccessController.doPrivileged(Native
>> Method)
>>

Re: Spark streaming - update configuration while retaining write ahead log data?

2016-03-15 Thread Saisai Shao
Currently configuration is a part of checkpoint data, and when recovering
from failure, Spark Streaming will fetch the configuration from checkpoint
data, so even if you change the configuration file, recovered Spark
Streaming application will not use it. So from my understanding currently
there's no way to handle your situation.

Thanks
Saisai

On Tue, Mar 15, 2016 at 5:12 PM, Ewan Leith 
wrote:

> Has anyone seen a way of updating the Spark streaming job configuration
> while retaining the existing data in the write ahead log?
>
>
>
> e.g. if you’ve launched a job without enough executors and a backlog has
> built up in the WAL, can you increase the number of executors without
> losing the WAL data?
>
>
>
> Thanks,
>
> Ewan
>


Re: Dynamic allocation doesn't work on YARN

2016-03-09 Thread Saisai Shao
Still I think this information is not enough to explain the reason.

1. Does your yarn cluster has enough resources to start all 10 executors?
2. Would you please try latest version, 1.6.0 or master branch to see if
this is a bug and already fixed.
3. you could add
"log4j.logger.org.apache.spark.ExecutorAllocationManager=DEBUG" to log4j
conf to expose more details, then maybe you could dig out some clues.


Thanks
Saisai

On Thu, Mar 10, 2016 at 10:18 AM, Jy Chen <chen.wah...@gmail.com> wrote:

> Sorry,the last configuration is also --conf
> spark.dynamicAllocation.cachedExecutorIdleTimeout=60s, "--conf" was lost
> when I copied it to mail.
>
> -- Forwarded message --
> From: Jy Chen <chen.wah...@gmail.com>
> Date: 2016-03-10 10:09 GMT+08:00
> Subject: Re: Dynamic allocation doesn't work on YARN
> To: Saisai Shao <sai.sai.s...@gmail.com>, user@spark.apache.org
>
>
> Hi,
> My Spark version is 1.5.1 with Hadoop 2.5.0-cdh5.2.0. These are my
> configurations of dynamic allocation:
> --master yarn-client --conf spark.dynamicAllocation.enabled=true --conf
> spark.shuffle.service.enabled=true --conf
> spark.dynamicAllocation.minExecutors=0 --conf
> spark.dynamicAllocation.initialExecutors=10
>  --conf spark.dynamicAllocation.executorIdleTimeout=60s
> spark.dynamicAllocation.cachedExecutorIdleTimeout=60s
>
> At first,it will remove 2 executors and then no more executors will be
> removed.
>
> Thanks
>
> 2016-03-09 17:24 GMT+08:00 Saisai Shao <sai.sai.s...@gmail.com>:
>
>> Would you please send out the configurations of dynamic allocation so we
>> could know better.
>>
>> On Wed, Mar 9, 2016 at 4:29 PM, Jy Chen <chen.wah...@gmail.com> wrote:
>>
>>> Hello everyone:
>>>
>>> I'm trying the dynamic allocation in Spark on YARN. I have followed
>>> configuration steps and started the shuffle service.
>>>
>>> Now it can request executors when the workload is heavy but it cannot
>>> remove executors. I try to open the spark shell and don’t run any command,
>>> no executor is removed after spark.dynamicAllocation.executorIdleTimeout
>>> interval.
>>> Am I missing something?
>>>
>>> Thanks.
>>>
>>>
>>
>
>


Re: Dynamic allocation doesn't work on YARN

2016-03-09 Thread Saisai Shao
Would you please send out the configurations of dynamic allocation so we
could know better.

On Wed, Mar 9, 2016 at 4:29 PM, Jy Chen  wrote:

> Hello everyone:
>
> I'm trying the dynamic allocation in Spark on YARN. I have followed
> configuration steps and started the shuffle service.
>
> Now it can request executors when the workload is heavy but it cannot
> remove executors. I try to open the spark shell and don’t run any command,
> no executor is removed after spark.dynamicAllocation.executorIdleTimeout
> interval.
> Am I missing something?
>
> Thanks.
>
>


Re: How to compile Spark with private build of Hadoop

2016-03-08 Thread Saisai Shao
I think the first step is to publish your in-house built Hadoop related
jars to your local maven or ivy repo, and then change the Spark building
profiles like -Phadoop-2.x (you could use 2.7 or you have to change the pom
file if you met jar conflicts) -Dhadoop.version=3.0.0-SNAPSHOT to build
against your specified version.

Thanks
Saisai


Re: Spark executor killed without apparent reason

2016-03-03 Thread Saisai Shao
If it is due to heartbeat problem and driver explicitly killed the
executors, there should be some driver logs mentioned about it. So you
could check the driver log about it. Also container (executor) logs are
useful, if this container is killed, then there'll be some signal related
logs, like (SIGTERM).

On Thu, Mar 3, 2016 at 4:00 PM, Nirav Patel  wrote:

> There was nothing in nodemanager logs that indicated why container was
> killed.
>
> Here's the guess: Since killed executors were experiencing high GC
> activities (full GC) before death they most likely failed to respond to
> heart beat to driver or nodemanager and got killed due to it.
>
> This is more relevant to issue:
> 16/02/24 11:11:47 ERROR server.TransportChannelHandler: Connection to
> maprnode5 has been quiet for 12 ms while there are outstanding
> requests. Assuming connection is dead; please adjust spark.network.timeout
> if this is wrong.
>
> Following has nothing to do with this. It was raised as I manually killed
> application at some point after too many executors were getting killed.
> " ERROR yarn.ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM"
>
> Thanks
>
> On Wed, Mar 2, 2016 at 8:22 AM, Nirav Patel  wrote:
>
>> I think that was due to manually killing application. ExecutorLost
>> started  around 04:46:21 and application was manually killed around
>> 05:54:41
>>
>>
>> yarn-xactly-nodemanager-hdn4.xactlycorporation.local.log:2016-03-01
>> 05:54:41,500 INFO
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl:
>> Stopping container with container Id:
>> *container_1456722312951_0450_01_01*
>>
>> yarn-xactly-nodemanager-hdn4.xactlycorporation.local.log:2016-03-01
>> 05:54:41,500 INFO org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger:
>> USER=xactly IP=10.250.70.119 OPERATION=Stop Container Request
>> TARGET=ContainerManageImpl RESULT=SUCCESS
>> APPID=application_1456722312951_0450 CONTAINERID=
>> *container_1456722312951_0450_01_01*
>>
>> yarn-xactly-nodemanager-hdn4.xactlycorporation.local.log:2016-03-01
>> 05:54:41,500 INFO
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container:
>> Container *container_1456722312951_0450_01_01* transitioned from
>> RUNNING to KILLING
>>
>> yarn-xactly-nodemanager-hdn4.xactlycorporation.local.log:2016-03-01
>> 05:54:41,501 INFO
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch:
>> Cleaning up container *container_1456722312951_0450_01_01*
>>
>> yarn-xactly-nodemanager-hdn4.xactlycorporation.local.log:2016-03-01
>> 05:54:41,507 WARN
>> org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exit
>> code from container *container_1456722312951_0450_01_01* is : 143
>>
>> yarn-xactly-nodemanager-hdn4.xactlycorporation.local.log:2016-03-01
>> 05:54:41,520 INFO
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container:
>> Container *container_1456722312951_0450_01_01* transitioned from
>> KILLING to CONTAINER_CLEANEDUP_AFTER_KILL
>>
>> yarn-xactly-nodemanager-hdn4.xactlycorporation.local.log:2016-03-01
>> 05:54:41,557 INFO
>> org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor:
>> Deleting absolute path :
>> /tmp/hadoop-xactly/nm-local-dir/usercache/xactly/appcache/application_1456722312951_0450/
>> *container_1456722312951_0450_01_01*
>>
>> yarn-xactly-nodemanager-hdn4.xactlycorporation.local.log:2016-03-01
>> 05:54:41,558 INFO org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger:
>> USER=xactly OPERATION=Container Finished - Killed TARGET=ContainerImpl
>> RESULT=SUCCESS APPID=application_1456722312951_0450 CONTAINERID=
>> *container_1456722312951_0450_01_01*
>>
>> yarn-xactly-nodemanager-hdn4.xactlycorporation.local.log:2016-03-01
>> 05:54:41,558 INFO
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container:
>> Container *container_1456722312951_0450_01_01* transitioned from
>> CONTAINER_CLEANEDUP_AFTER_KILL to DONE
>>
>> yarn-xactly-nodemanager-hdn4.xactlycorporation.local.log:2016-03-01
>> 05:54:41,566 INFO
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application:
>> Removing *container_1456722312951_0450_01_01* from application
>> application_1456722312951_0450
>>
>> yarn-xactly-nodemanager-hdn4.xactlycorporation.local.log:2016-03-01
>> 05:54:41,567 INFO
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AppLogAggregatorImpl:
>> Considering container*container_1456722312951_0450_01_01* for
>> log-aggregation
>>
>> yarn-xactly-nodemanager-hdn4.xactlycorporation.local.log:2016-03-01
>> 05:54:41,567 INFO org.apache.spark.network.yarn.YarnShuffleService:
>> Stopping container *container_1456722312951_0450_01_01*
>>
>> yarn-xactly-nodemanager-hdn4.xactlycorporation.local.log:2016-03-01
>> 05:54:42,504 INFO
>> org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl: 

Re: Spark streaming: StorageLevel.MEMORY_AND_DISK_SER setting for KafkaUtils.createDirectStream

2016-03-02 Thread Saisai Shao
You don't have to specify the storage level for direct Kafka API, since it
doesn't require to store the input data ahead of time. Only receiver-based
approach could specify the storage level.

Thanks
Saisai

On Wed, Mar 2, 2016 at 7:08 PM, Vinti Maheshwari 
wrote:

> Hi All,
>
> I wanted to set *StorageLevel.MEMORY_AND_DISK_SER* in my spark-streaming
> program as currently i am getting
> MetadataFetchFailedException*. *I am not sure where i should pass
> StorageLevel.MEMORY_AND_DISK, as it seems like createDirectStream doesn't
> allow to pass that parameter.
>
>
> val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, 
> StringDecoder](
>   ssc, kafkaParams, topicsSet)
>
>
> Full Error:
>
> *org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
> location for shuffle 0*
> at
> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:460)
> at
> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:456)
> at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> at
> org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:456)
> at
> org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:183)
> at
> org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:47)
> at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:90)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:88)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
> )
>
> Thanks,
> ~Vinti
>
>


Re: Kafka streaming receiver approach - new topic not read from beginning

2016-02-22 Thread Saisai Shao
You could set this configuration "auto.offset.reset" through parameter
"kafkaParams" which is provided in some other overloaded APIs of
createStream.

By default Kafka will pick data from latest offset unless you explicitly
set it, this is the behavior Kafka, not Spark.

Thanks
Saisai

On Mon, Feb 22, 2016 at 5:52 PM, Paul Leclercq 
wrote:

> Hi,
>
> Do you know why, with the receiver approach
> 
> and a *consumer group*, a new topic is not read from the beginning but
> from the lastest ?
>
> Code example :
>
>  val kafkaStream = KafkaUtils.createStream(streamingContext,
>  [ZK quorum], [consumer group id], [per-topic number of Kafka partitions 
> to consume])
>
>
> Is there a way to tell *only for new topic *to read from the beginning ?
>
> From Confluence FAQ
>
>> Alternatively, you can configure the consumer by setting
>> auto.offset.reset to "earliest" for the new consumer in 0.9 and "smallest"
>> for the old consumer.
>
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whydoesmyconsumernevergetanydata?
>
> Thanks
> --
>
> Paul Leclercq
>


Re: Yarn client mode: Setting environment variables

2016-02-17 Thread Saisai Shao
IIUC for example you want to set environment FOO=bar in executor side, you
could use "spark.executor.Env.FOO=bar" in conf file, AM will pick this
configuration and set as environment variable through container launching.
Just list all the envs you want to set in executor side like
spark.executor.xxx=xxx.

Thanks
Saisai

On Thu, Feb 18, 2016 at 3:31 AM, Lin Zhao  wrote:

> I've been trying to set some environment variables for the spark executors
> but haven't had much like. I tried editting conf/spark-env.sh but it
> doesn't get through to the executors. I'm running 1.6.0 and yarn, any
> pointer is appreciated.
>
> Thanks,
> Lin
>


Re: IllegalStateException : When use --executor-cores option in YARN

2016-02-14 Thread Saisai Shao
Hi Divya,

Would you please provide full stack of exception? From my understanding
--executor-cores should be worked, we could know better if you provide the
full stack trace.

The performance relies on many different aspects, I'd recommend you to
check the spark web UI to know the application runtime better.

Spark shell is a cmdline Spark application for you to interactively execute
spark jobs, whereas Spark-submit is used to submit your own spark
applications (
http://spark.apache.org/docs/latest/submitting-applications.html).

Thanks
Saisai

On Mon, Feb 15, 2016 at 10:36 AM, Divya Gehlot 
wrote:

> Hi,
>
> I am starting spark-shell with following options :
> spark-shell --properties-file  /TestDivya/Spark/Oracle.properties --jars
> /usr/hdp/2.3.4.0-3485/spark/lib/ojdbc6.jar --driver-class-path
> /usr/hdp/2.3.4.0-3485/spark/lib/ojdbc6.jar --packages
> com.databricks:spark-csv_2.10:1.1.0  --master yarn-client --num-executors
> 10 --executor-cores 4 -i /TestDivya/Spark/Test.scala
>
> Got few queries :
> 1.Error :
> java.lang.IllegalStateException: SparkContext has been shutdown
>
> If I remove --executor-cores 4 .. It runs smoothly
>
> 2. with --num-executors 10 my spark job takes more time .
>  May I know why ?
>
> 3. Whats the difference between spark-shell and spark-submit
>
> I am new bee to Spark ..Apologies for such naive questions.
> Just  trying to figure out how to tune spark jobs to increase performance
> on Hadoop cluster on EC2.
> If anybody has real time experience ,please help me.
>
>
> Thanks,
> Divya
>


Re: Programmatically launching spark on yarn-client mode no longer works in spark 1.5.2

2016-01-28 Thread Saisai Shao
Sorry I didn't notice this mail, seems like a wrong cmdline problem, please
ignore my previous comment.

On Fri, Jan 29, 2016 at 11:58 AM, Nirav Patel <npa...@xactlycorp.com> wrote:

> Thanks Saisai. I saw following in yarn container logs. I think that killed
> sparkcontext.
>
> 16/01/28 17:38:29 INFO yarn.ApplicationMaster: Registered signal handlers for 
> [TERM, HUP, INT]*Unknown/unsupported param List*(--properties-file, 
> /tmp/hadoop-xactly/nm-local-dir/usercache/nir/appcache/application_1453752281504_3427/container_1453752281504_3427_01_02/__spark_conf__/__spark_conf__.properties)
>
> Usage: org.apache.spark.deploy.yarn.ApplicationMaster [options]
> Options:
>   --jar JAR_PATH   Path to your application's JAR file
>   --class CLASS_NAME   Name of your application's main class
>   --primary-py-fileA main Python file
>   --py-files PY_FILES  Comma-separated list of .zip, .egg, or .py files to
>place on the PYTHONPATH for Python apps.
>   --args ARGS  Arguments to be passed to your application's main 
> class.
>Multiple invocations are possible, each will be passed 
> in order.
>   --num-executors NUMNumber of executors to start (Default: 2)
>   --executor-cores NUM   Number of cores for the executors (Default: 1)
>   --executor-memory MEM  Memory per executor (e.g. 1000M, 2G) (Default: 1G)
>
>
>
> But if you are saying creating sparkcontext manually in your application
> still works then I'll investigate more on my side. It just before I dig
> more I wanted to know if it was still supported.
>
> Nir
>
> On Thu, Jan 28, 2016 at 7:47 PM, Saisai Shao <sai.sai.s...@gmail.com>
> wrote:
>
>> I think I met this problem before, this problem might be due to some race
>> conditions in exit period. The way you mentioned is still valid, this
>> problem only occurs when stopping the application.
>>
>> Thanks
>> Saisai
>>
>> On Fri, Jan 29, 2016 at 10:22 AM, Nirav Patel <npa...@xactlycorp.com>
>> wrote:
>>
>>> Hi, we were using spark 1.3.1 and launching our spark jobs on
>>> yarn-client mode programmatically via creating a sparkConf and sparkContext
>>> object manually. It was inspired from spark self-contained application
>>> example here:
>>>
>>>
>>> https://spark.apache.org/docs/1.5.2/quick-start.html#self-contained-applications\
>>>
>>>
>>>
>>> Only additional configuration we would provide would be all related to
>>> yarn like executor instance, cores etc.
>>>
>>> However after upgrading to spark 1.5.2 above application breaks on a
>>> line `val sparkContext = new SparkContext(sparkConf)`
>>>
>>> 16/01/28 17:38:35 ERROR util.Utils: Uncaught exception in thread main
>>>
>>> java.lang.NullPointerException
>>>
>>> at
>>> org.apache.spark.network.netty.NettyBlockTransferService.close(NettyBlockTransferService.scala:152)
>>>
>>> at org.apache.spark.storage.BlockManager.stop(BlockManager.scala:1228)
>>>
>>> at org.apache.spark.SparkEnv.stop(SparkEnv.scala:100)
>>>
>>> at
>>> org.apache.spark.SparkContext$$anonfun$stop$12.apply$mcV$sp(SparkContext.scala:1749)
>>>
>>> at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1185)
>>>
>>> at org.apache.spark.SparkContext.stop(SparkContext.scala:1748)
>>>
>>> at org.apache.spark.SparkContext.(SparkContext.scala:593)
>>>
>>>
>>> So is this approach still supposed to work? Or do I must use
>>> SparkLauncher class with spark 1.5.2?
>>>
>>>
>>> Thanks
>>>
>>>
>>>
>>> [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>
>>>
>>> <https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn]
>>> <https://www.linkedin.com/company/xactly-corporation>  [image: Twitter]
>>> <https://twitter.com/Xactly>  [image: Facebook]
>>> <https://www.facebook.com/XactlyCorp>  [image: YouTube]
>>> <http://www.youtube.com/xactlycorporation>
>>
>>
>>
>
>
>
> [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>
>
> <https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn]
> <https://www.linkedin.com/company/xactly-corporation>  [image: Twitter]
> <https://twitter.com/Xactly>  [image: Facebook]
> <https://www.facebook.com/XactlyCorp>  [image: YouTube]
> <http://www.youtube.com/xactlycorporation>
>


Re: Programmatically launching spark on yarn-client mode no longer works in spark 1.5.2

2016-01-28 Thread Saisai Shao
I think I met this problem before, this problem might be due to some race
conditions in exit period. The way you mentioned is still valid, this
problem only occurs when stopping the application.

Thanks
Saisai

On Fri, Jan 29, 2016 at 10:22 AM, Nirav Patel  wrote:

> Hi, we were using spark 1.3.1 and launching our spark jobs on yarn-client
> mode programmatically via creating a sparkConf and sparkContext object
> manually. It was inspired from spark self-contained application example
> here:
>
>
> https://spark.apache.org/docs/1.5.2/quick-start.html#self-contained-applications\
>
>
>
> Only additional configuration we would provide would be all related to
> yarn like executor instance, cores etc.
>
> However after upgrading to spark 1.5.2 above application breaks on a line `
> val sparkContext = new SparkContext(sparkConf)`
>
> 16/01/28 17:38:35 ERROR util.Utils: Uncaught exception in thread main
>
> java.lang.NullPointerException
>
> at
> org.apache.spark.network.netty.NettyBlockTransferService.close(NettyBlockTransferService.scala:152)
>
> at org.apache.spark.storage.BlockManager.stop(BlockManager.scala:1228)
>
> at org.apache.spark.SparkEnv.stop(SparkEnv.scala:100)
>
> at
> org.apache.spark.SparkContext$$anonfun$stop$12.apply$mcV$sp(SparkContext.scala:1749)
>
> at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1185)
>
> at org.apache.spark.SparkContext.stop(SparkContext.scala:1748)
>
> at org.apache.spark.SparkContext.(SparkContext.scala:593)
>
>
> So is this approach still supposed to work? Or do I must use SparkLauncher
> class with spark 1.5.2?
>
>
> Thanks
>
>
>
> [image: What's New with Xactly] 
>
>   [image: LinkedIn]
>   [image: Twitter]
>   [image: Facebook]
>   [image: YouTube]
> 


Re: How data locality is honored when spark is running on yarn

2016-01-27 Thread Saisai Shao
Hi Todd,

There're two levels of locality based scheduling when you run Spark on Yarn
if dynamic allocation enabled:

1. Container allocation is based on the locality ratio of pending tasks,
this is Yarn specific and only works with dynamic allocation enabled.
2. Task scheduling is locality awared, this is same for different cluster
manager.

Thanks
Saisai

On Thu, Jan 28, 2016 at 10:50 AM, Todd  wrote:

> Hi,
> I am kind of confused about how data locality is honored when  spark  is
> running on yarn(client or cluster mode),can someone please elaberate on
> this? Thanks!
>
>
>


Re: streaming textFileStream problem - got only ONE line

2016-01-26 Thread Saisai Shao
Any possibility that this file is still written by other application, so
what Spark Streaming processed is an incomplete file.

On Tue, Jan 26, 2016 at 5:30 AM, Shixiong(Ryan) Zhu  wrote:

> Did you move the file into "hdfs://helmhdfs/user/patcharee/cerdata/", or
> write into it directly? `textFileStream` requires that files must be
> written to the monitored directory by "moving" them from another location
> within the same file system.
>
> On Mon, Jan 25, 2016 at 6:30 AM, patcharee 
> wrote:
>
>> Hi,
>>
>> My streaming application is receiving data from file system and just
>> prints the input count every 1 sec interval, as the code below:
>>
>> val sparkConf = new SparkConf()
>> val ssc = new StreamingContext(sparkConf, Milliseconds(interval_ms))
>> val lines = ssc.textFileStream(args(0))
>> lines.count().print()
>>
>> The problem is sometimes the data received from scc.textFileStream is
>> ONLY ONE line. But in fact there are multiple lines in the new file found
>> in that interval. See log below which shows three intervals. In the 2nd
>> interval, the new file is:
>> hdfs://helmhdfs/user/patcharee/cerdata/datetime_19617.txt. This file
>> contains 6288 lines. The ssc.textFileStream returns ONLY ONE line (the
>> header).
>>
>> Any ideas/suggestions what the problem is?
>>
>>
>> -
>> SPARK LOG
>>
>> -
>>
>> 16/01/25 15:11:11 INFO FileInputDStream: Cleared 1 old files that were
>> older than 1453731011000 ms: 145373101 ms
>> 16/01/25 15:11:11 INFO FileInputDStream: Cleared 0 old files that were
>> older than 1453731011000 ms:
>> 16/01/25 15:11:12 INFO FileInputDStream: Finding new files took 4 ms
>> 16/01/25 15:11:12 INFO FileInputDStream: New files at time 1453731072000
>> ms:
>> hdfs://helmhdfs/user/patcharee/cerdata/datetime_19616.txt
>> ---
>> Time: 1453731072000 ms
>> ---
>> 6288
>>
>> 16/01/25 15:11:12 INFO FileInputDStream: Cleared 1 old files that were
>> older than 1453731012000 ms: 1453731011000 ms
>> 16/01/25 15:11:12 INFO FileInputDStream: Cleared 0 old files that were
>> older than 1453731012000 ms:
>> 16/01/25 15:11:13 INFO FileInputDStream: Finding new files took 4 ms
>> 16/01/25 15:11:13 INFO FileInputDStream: New files at time 1453731073000
>> ms:
>> hdfs://helmhdfs/user/patcharee/cerdata/datetime_19617.txt
>> ---
>> Time: 1453731073000 ms
>> ---
>> 1
>>
>> 16/01/25 15:11:13 INFO FileInputDStream: Cleared 1 old files that were
>> older than 1453731013000 ms: 1453731012000 ms
>> 16/01/25 15:11:13 INFO FileInputDStream: Cleared 0 old files that were
>> older than 1453731013000 ms:
>> 16/01/25 15:11:14 INFO FileInputDStream: Finding new files took 3 ms
>> 16/01/25 15:11:14 INFO FileInputDStream: New files at time 1453731074000
>> ms:
>> hdfs://helmhdfs/user/patcharee/cerdata/datetime_19618.txt
>> ---
>> Time: 1453731074000 ms
>> ---
>> 6288
>>
>>
>> Thanks,
>> Patcharee
>>
>
>


Re: OOM on yarn-cluster mode

2016-01-19 Thread Saisai Shao
You could try increase the driver memory by "--driver-memory", looks like
the OOM is came from driver side, so the simple solution is to increase the
memory of driver.

On Tue, Jan 19, 2016 at 1:15 PM, Julio Antonio Soto  wrote:

> Hi,
>
> I'm having trouble when uploadig spark jobs in yarn-cluster mode. While
> the job works and completes in yarn-client mode, I hit the following error
> when using spark-submit in yarn-cluster (simplified):
>
> 16/01/19 21:43:31 INFO hive.metastore: Connected to metastore.
> 16/01/19 21:43:32 WARN util.NativeCodeLoader: Unable to load native-hadoop 
> library for your platform... using builtin-java classes where applicable
> 16/01/19 21:43:32 INFO session.SessionState: Created local directory: 
> /yarn/nm/usercache/julio/appcache/application_1453120455858_0040/container_1453120455858_0040_01_01/tmp/77350a02-d900-4c84-9456-134305044d21_resources
> 16/01/19 21:43:32 INFO session.SessionState: Created HDFS directory: 
> /tmp/hive/nobody/77350a02-d900-4c84-9456-134305044d21
> 16/01/19 21:43:32 INFO session.SessionState: Created local directory: 
> /yarn/nm/usercache/julio/appcache/application_1453120455858_0040/container_1453120455858_0040_01_01/tmp/nobody/77350a02-d900-4c84-9456-134305044d21
> 16/01/19 21:43:32 INFO session.SessionState: Created HDFS directory: 
> /tmp/hive/nobody/77350a02-d900-4c84-9456-134305044d21/_tmp_space.db
> 16/01/19 21:43:32 INFO parquet.ParquetRelation: Listing 
> hdfs://namenode01:8020/user/julio/PFM/CDRs_parquet_np on driver
> 16/01/19 21:43:33 INFO spark.SparkContext: Starting job: table at 
> code.scala:13
> 16/01/19 21:43:33 INFO scheduler.DAGScheduler: Got job 0 (table at 
> code.scala:13) with 8 output partitions
> 16/01/19 21:43:33 INFO scheduler.DAGScheduler: Final stage: ResultStage 
> 0(table at code.scala:13)
> 16/01/19 21:43:33 INFO scheduler.DAGScheduler: Parents of final stage: List()
> 16/01/19 21:43:33 INFO scheduler.DAGScheduler: Missing parents: List()
> 16/01/19 21:43:33 INFO scheduler.DAGScheduler: Submitting ResultStage 0 
> (MapPartitionsRDD[1] at table at code.scala:13), which has no missing parents
> Exception in thread "dag-scheduler-event-loop"
> Exception: java.lang.OutOfMemoryError thrown from the 
> UncaughtExceptionHandler in thread "dag-scheduler-event-loop"
> Exception in thread "SparkListenerBus"
> Exception: java.lang.OutOfMemoryError thrown from the 
> UncaughtExceptionHandler in thread "SparkListenerBus"
>
> It happens with whatever program I build, for example:
>
> object MainClass {
> def main(args:Array[String]):Unit = {
> val conf = (new org.apache.spark.SparkConf()
>  .setAppName("test")
>  )
>
> val sc = new org.apache.spark.SparkContext(conf)
> val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
>
> val rdd = (sqlContext.read.table("cdrs_np")
> .na.drop(how="any")
> .map(_.toSeq.map(y=>y.toString))
> .map(x=>(x.head,x.tail)
> )
>
> rdd.saveAsTextFile(args(0))
> }
> }
>
> The command I'm using in spark-submit is the following:
>
> spark-submit --master yarn \
>  --deploy-mode cluster \
>  --driver-memory 1G \
>  --executor-memory 3000m \
>  --executor-cores 1 \
>  --num-executors 8 \
>  --class MainClass \
>  spark-yarn-cluster-test_2.10-0.1.jar \
>  hdfs://namenode01/etl/test
>
> I've got more than enough resources in my cluster in order to run the job
> (in fact, the exact same command works in --deploy-mode client).
>
> I tried to increase yarn.app.mapreduce.am.resource.mb to 2GB, but that
> didn't work. I guess there is another parameter I should tweak, but I have
> not found any info whatsoever in the Internet.
>
> I'm running Spark 1.5.2 and YARN from Hadoop 2.6.0-cdh5.5.1.
>
>
> Any help would be greatly appreciated!
>
> Thank you.
>
> --
> Julio Antonio Soto de Vicente
>


Re: Problem About Worker System.out

2015-12-28 Thread Saisai Shao
Stdout will not be sent back to driver, no matter you use Scala or Java.
You must do something wrongly that makes you think it is an expected
behavior.

On Mon, Dec 28, 2015 at 5:33 PM, David John 
wrote:

> I have used  Spark *1.4*  for 6 months.  Thanks  all the members of this
> community for your great work.
> I have a question  about the logging issue. I hope this question can be
> solved.
>
> The program is running under this configurations:
> YARN Cluster,
> YARN-client mode.
>
> In *Scala*,
> writing a code like:
> *rdd.map( a => println(a) );  *
>  will get the output about the value of a in our console.
>
> However,
> in *Java (1.7)*,
> writing
> rdd.map( new Function(){
>  @Override
>  public  Integer   call(Integer a) throws Exception {
>   *System.out.println(a);*
>  }
> });
> won't get the output in our console.
>
> The configuration is the same.
>
> I have try this code but not work either:
>  rdd.map( new Function(){
>  @Override
>  public  Integer   call(Integer a) throws Exception {
> org.apache.log4j.Logger log =
> Logger.getLogger(this.getClass());
>  log.info(a);
>  log.warn(a);
>  log.error(a);
>  log.fatal(a);
>  }
> });
>
> No output either:
> final   org.apache.log4j.Logger log = Logger.getLogger(this.getClass());
>  rdd.map( new Function(){
>  @Override
>  public  Integer   call(Integer a) throws Exception {
>  log.info(a);
>  log.warn(a);
>  log.error(a);
>  log.fatal(a);
>  }
> });
>
> It seems that the output of stdout in worker doesn't send the output back
> to our driver.
> I am wonder why it works in scala but not in java.
> Is there a  simple way to make java work like scala?
>
> Thanks.
>


Re: Opening Dynamic Scaling Executors on Yarn

2015-12-27 Thread Saisai Shao
External shuffle service is backward compatible, so if you deployed 1.6
shuffle service on NM, it could serve both 1.5 and 1.6 Spark applications.

Thanks
Saisai

On Mon, Dec 28, 2015 at 2:33 PM, 顾亮亮 <guliangli...@qiyi.com> wrote:

> Is it possible to support both spark-1.5.1 and spark-1.6.0 on one yarn
> cluster?
>
>
>
> *From:* Saisai Shao [mailto:sai.sai.s...@gmail.com]
> *Sent:* Monday, December 28, 2015 2:29 PM
> *To:* Jeff Zhang
> *Cc:* 顾亮亮; user@spark.apache.org; 刘骋昺
> *Subject:* Re: Opening Dynamic Scaling Executors on Yarn
>
>
>
> Replace all the shuffle jars and restart the NodeManager is enough, no
> need to restart NN.
>
>
>
> On Mon, Dec 28, 2015 at 2:05 PM, Jeff Zhang <zjf...@gmail.com> wrote:
>
> See
> http://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation
>
>
>
>
>
>
>
> On Mon, Dec 28, 2015 at 2:00 PM, 顾亮亮 <guliangli...@qiyi.com> wrote:
>
> Hi all,
>
>
>
> SPARK-3174 (https://issues.apache.org/jira/browse/SPARK-3174) is a useful
> feature to save resources on yarn.
>
> We want to open this feature on our yarn cluster.
>
> I have a question about the version of shuffle service.
>
>
>
> I’m now using spark-1.5.1 (shuffle service).
>
> If I want to upgrade to spark-1.6.0, should I replace the shuffle service
> jar and restart all the namenode on yarn ?
>
>
>
> Thanks a lot.
>
>
>
> Mars
>
>
>
>
>
>
>
> --
>
> Best Regards
>
> Jeff Zhang
>
>
>


Re: Opening Dynamic Scaling Executors on Yarn

2015-12-27 Thread Saisai Shao
Replace all the shuffle jars and restart the NodeManager is enough, no need
to restart NN.

On Mon, Dec 28, 2015 at 2:05 PM, Jeff Zhang  wrote:

> See
> http://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation
>
>
>
> On Mon, Dec 28, 2015 at 2:00 PM, 顾亮亮  wrote:
>
>> Hi all,
>>
>>
>>
>> SPARK-3174 (https://issues.apache.org/jira/browse/SPARK-3174) is a
>> useful feature to save resources on yarn.
>>
>> We want to open this feature on our yarn cluster.
>>
>> I have a question about the version of shuffle service.
>>
>>
>>
>> I’m now using spark-1.5.1 (shuffle service).
>>
>> If I want to upgrade to spark-1.6.0, should I replace the shuffle service
>> jar and restart all the namenode on yarn ?
>>
>>
>>
>> Thanks a lot.
>>
>>
>>
>> Mars
>>
>>
>>
>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: Job Error:Actor not found for: ActorSelection[Anchor(akka.tcp://sparkDriver@130.1.10.108:23600/)

2015-12-25 Thread Saisai Shao
I think SparkContext is thread-safe, you could concurrently submit jobs
from different threads, the problem you hit might not relate to this. Can
you reproduce this issue each time when you concurrently submit jobs, or is
it happened occasionally?

BTW, I guess you're using the old version of Spark, it may potentially have
concurrency problem, you could switch to a new version to take a try.

Thanks
Saisai

On Fri, Dec 25, 2015 at 2:26 PM, donhoff_h <165612...@qq.com> wrote:

> Hi,folks
>
> I wrote some spark jobs and these jobs could ran successfully when I ran
> them one by one. But if I ran them concurrently, for example 12 jobs
> parallel running, I met the following error. Could anybody tell me what
> cause this? How to solve it? Many Thanks!
>
> Exception in thread "main" akka.actor.ActorNotFound: Actor not found for:
> ActorSelection[Anchor(akka.tcp://sparkDriver@130.1.10.108:23600/),
> Path(/user/MapOutputTracker)]
> at
> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
> at
> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
> at
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
> at
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
> at
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
> at
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
> at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
> at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
> at
> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
> at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
> at
> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
> at
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
> at
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267)
> at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:89)
> at
> akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:937)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:415)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>


  1   2   >