Re: [Spark 2.0.0] error when unioning to an empty dataset

2016-10-20 Thread Efe Selcuk
Thanks for the response. What do you mean by "semantically" the same?
They're both Datasets of the same type, which is a case class, so I would
expect compile-time integrity of the data. Is there a situation where this
wouldn't be the case?

Interestingly enough, if I instead create an empty rdd with
sparkContext.emptyRDD of the same case class type, it works!

So something like:
var data = spark.sparkContext.emptyRDD[SomeData]

// loop
  data = data.union(someCode.thatReturnsADataset().rdd)
// end loop

data.toDS //so I can union it to the actual Dataset I have elsewhere

On Thu, Oct 20, 2016 at 8:34 PM Agraj Mangal  wrote:

I believe this normally comes when Spark is unable to perform union due to
"difference" in schema of the operands. Can you check if the schema of both
the datasets are semantically same ?

On Tue, Oct 18, 2016 at 9:06 AM, Efe Selcuk  wrote:

Bump!

On Thu, Oct 13, 2016 at 8:25 PM Efe Selcuk  wrote:

I have a use case where I want to build a dataset based off of
conditionally available data. I thought I'd do something like this:

case class SomeData( ... ) // parameters are basic encodable types like
strings and BigDecimals

var data = spark.emptyDataset[SomeData]

// loop, determining what data to ingest and process into datasets
  data = data.union(someCode.thatReturnsADataset)
// end loop

However I get a runtime exception:

Exception in thread "main" org.apache.spark.sql.AnalysisException:
unresolved operator 'Union;
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40)
at
org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:58)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:361)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67)
at
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:67)
at
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:58)
at
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:49)
at org.apache.spark.sql.Dataset.(Dataset.scala:161)
at org.apache.spark.sql.Dataset.(Dataset.scala:167)
at org.apache.spark.sql.Dataset$.apply(Dataset.scala:59)
at org.apache.spark.sql.Dataset.withTypedPlan(Dataset.scala:2594)
at org.apache.spark.sql.Dataset.union(Dataset.scala:1459)

Granted, I'm new at Spark so this might be an anti-pattern, so I'm open to
suggestions. However it doesn't seem like I'm doing anything incorrect
here, the types are correct. Searching for this error online returns
results seemingly about working in dataframes and having mismatching
schemas or a different order of fields, and it seems like bugfixes have
gone into place for those cases.

Thanks in advance.
Efe




-- 
Thanks & Regards,
Agraj Mangal


Re: [Spark 2.0.0] error when unioning to an empty dataset

2016-10-20 Thread Agraj Mangal
I believe this normally comes when Spark is unable to perform union due to
"difference" in schema of the operands. Can you check if the schema of both
the datasets are semantically same ?

On Tue, Oct 18, 2016 at 9:06 AM, Efe Selcuk  wrote:

> Bump!
>
> On Thu, Oct 13, 2016 at 8:25 PM Efe Selcuk  wrote:
>
>> I have a use case where I want to build a dataset based off of
>> conditionally available data. I thought I'd do something like this:
>>
>> case class SomeData( ... ) // parameters are basic encodable types like
>> strings and BigDecimals
>>
>> var data = spark.emptyDataset[SomeData]
>>
>> // loop, determining what data to ingest and process into datasets
>>   data = data.union(someCode.thatReturnsADataset)
>> // end loop
>>
>> However I get a runtime exception:
>>
>> Exception in thread "main" org.apache.spark.sql.AnalysisException:
>> unresolved operator 'Union;
>> at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.
>> failAnalysis(CheckAnalysis.scala:40)
>> at org.apache.spark.sql.catalyst.analysis.Analyzer.
>> failAnalysis(Analyzer.scala:58)
>> at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$
>> anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:361)
>> at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$
>> anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67)
>> at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(
>> TreeNode.scala:126)
>> at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.
>> checkAnalysis(CheckAnalysis.scala:67)
>> at org.apache.spark.sql.catalyst.analysis.Analyzer.
>> checkAnalysis(Analyzer.scala:58)
>> at org.apache.spark.sql.execution.QueryExecution.
>> assertAnalyzed(QueryExecution.scala:49)
>> at org.apache.spark.sql.Dataset.(Dataset.scala:161)
>> at org.apache.spark.sql.Dataset.(Dataset.scala:167)
>> at org.apache.spark.sql.Dataset$.apply(Dataset.scala:59)
>> at org.apache.spark.sql.Dataset.withTypedPlan(Dataset.scala:2594)
>> at org.apache.spark.sql.Dataset.union(Dataset.scala:1459)
>>
>> Granted, I'm new at Spark so this might be an anti-pattern, so I'm open
>> to suggestions. However it doesn't seem like I'm doing anything incorrect
>> here, the types are correct. Searching for this error online returns
>> results seemingly about working in dataframes and having mismatching
>> schemas or a different order of fields, and it seems like bugfixes have
>> gone into place for those cases.
>>
>> Thanks in advance.
>> Efe
>>
>>


-- 
Thanks & Regards,
Agraj Mangal


Re: spark pi example fail on yarn

2016-10-20 Thread Li Li
I modified yarn-site.xml yarn.nodemanager.vmem-check-enabled to false
and it works for yarn-client and spark-shell

On Fri, Oct 21, 2016 at 10:59 AM, Li Li  wrote:
> I found a warn in nodemanager log. is the virtual memory exceed? how
> should I config yarn to solve this problem?
>
> 2016-10-21 10:41:12,588 INFO
> org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
> Memory usage of ProcessTree 20299 for container-id
> container_1477017445921_0001_02_01: 335.1 MB of 1 GB physical
> memory used; 2.2 GB of 2.1 GB virtual memory used
> 2016-10-21 10:41:12,589 WARN
> org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
> Process tree for container: container_1477017445921_0001_02_01 has
> processes older than 1 iteration running over the configured limit.
> Limit=2254857728, current usage = 2338873344
>
> On Fri, Oct 21, 2016 at 8:49 AM, Saisai Shao  wrote:
>> It is not Spark has difficulty to communicate with YARN, it simply means AM
>> is exited with FINISHED state.
>>
>> I'm guessing it might be related to memory constraints for container, please
>> check the yarn RM and NM logs to find out more details.
>>
>> Thanks
>> Saisai
>>
>> On Fri, Oct 21, 2016 at 8:14 AM, Xi Shen  wrote:
>>>
>>> 16/10/20 18:12:14 ERROR cluster.YarnClientSchedulerBackend: Yarn
>>> application has already exited with state FINISHED!
>>>
>>>  From this, I think it is spark has difficult communicating with YARN. You
>>> should check your Spark log.
>>>
>>>
>>> On Fri, Oct 21, 2016 at 8:06 AM Li Li  wrote:

 which log file should I

 On Thu, Oct 20, 2016 at 10:02 PM, Saisai Shao 
 wrote:
 > Looks like ApplicationMaster is killed by SIGTERM.
 >
 > 16/10/20 18:12:04 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL TERM
 > 16/10/20 18:12:04 INFO yarn.ApplicationMaster: Final app status:
 >
 > This container may be killed by yarn NodeManager or other processes,
 > you'd
 > better check yarn log to dig out more details.
 >
 > Thanks
 > Saisai
 >
 > On Thu, Oct 20, 2016 at 6:51 PM, Li Li  wrote:
 >>
 >> I am setting up a small yarn/spark cluster. hadoop/yarn version is
 >> 2.7.3 and I can run wordcount map-reduce correctly in yarn.
 >> And I am using  spark-2.0.1-bin-hadoop2.7 using command:
 >> ~/spark-2.0.1-bin-hadoop2.7$ ./bin/spark-submit --class
 >> org.apache.spark.examples.SparkPi --master yarn-client
 >> examples/jars/spark-examples_2.11-2.0.1.jar 1
 >> it fails and the first error is:
 >> 16/10/20 18:12:03 INFO storage.BlockManagerMaster: Registered
 >> BlockManager BlockManagerId(driver, 10.161.219.189, 39161)
 >> 16/10/20 18:12:03 INFO handler.ContextHandler: Started
 >> o.s.j.s.ServletContextHandler@76ad6715{/metrics/json,null,AVAILABLE}
 >> 16/10/20 18:12:12 INFO
 >> cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster
 >> registered as NettyRpcEndpointRef(null)
 >> 16/10/20 18:12:12 INFO cluster.YarnClientSchedulerBackend: Add WebUI
 >> Filter. org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter,
 >> Map(PROXY_HOSTS -> ai-hz1-spark1, PROXY_URI_BASES ->
 >> http://ai-hz1-spark1:8088/proxy/application_1476957324184_0002),
 >> /proxy/application_1476957324184_0002
 >> 16/10/20 18:12:12 INFO ui.JettyUtils: Adding filter:
 >> org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
 >> 16/10/20 18:12:12 INFO cluster.YarnClientSchedulerBackend:
 >> SchedulerBackend is ready for scheduling beginning after waiting
 >> maxRegisteredResourcesWaitingTime: 3(ms)
 >> 16/10/20 18:12:12 WARN spark.SparkContext: Use an existing
 >> SparkContext, some configuration may not take effect.
 >> 16/10/20 18:12:12 INFO handler.ContextHandler: Started
 >> o.s.j.s.ServletContextHandler@489091bd{/SQL,null,AVAILABLE}
 >> 16/10/20 18:12:12 INFO handler.ContextHandler: Started
 >> o.s.j.s.ServletContextHandler@1de9b505{/SQL/json,null,AVAILABLE}
 >> 16/10/20 18:12:12 INFO handler.ContextHandler: Started
 >> o.s.j.s.ServletContextHandler@378f002a{/SQL/execution,null,AVAILABLE}
 >> 16/10/20 18:12:12 INFO handler.ContextHandler: Started
 >>
 >> o.s.j.s.ServletContextHandler@2cc75074{/SQL/execution/json,null,AVAILABLE}
 >> 16/10/20 18:12:12 INFO handler.ContextHandler: Started
 >> o.s.j.s.ServletContextHandler@2d64160c{/static/sql,null,AVAILABLE}
 >> 16/10/20 18:12:12 INFO internal.SharedState: Warehouse path is
 >> '/home/hadoop/spark-2.0.1-bin-hadoop2.7/spark-warehouse'.
 >> 16/10/20 18:12:13 INFO spark.SparkContext: Starting job: reduce at
 >> SparkPi.scala:38
 >> 16/10/20 18:12:13 INFO scheduler.DAGScheduler: Got job 0 (reduce at
 >> SparkPi.scala:38) with 1 output 

Re: spark pi example fail on yarn

2016-10-20 Thread Li Li
I found a warn in nodemanager log. is the virtual memory exceed? how
should I config yarn to solve this problem?

2016-10-21 10:41:12,588 INFO
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
Memory usage of ProcessTree 20299 for container-id
container_1477017445921_0001_02_01: 335.1 MB of 1 GB physical
memory used; 2.2 GB of 2.1 GB virtual memory used
2016-10-21 10:41:12,589 WARN
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
Process tree for container: container_1477017445921_0001_02_01 has
processes older than 1 iteration running over the configured limit.
Limit=2254857728, current usage = 2338873344

On Fri, Oct 21, 2016 at 8:49 AM, Saisai Shao  wrote:
> It is not Spark has difficulty to communicate with YARN, it simply means AM
> is exited with FINISHED state.
>
> I'm guessing it might be related to memory constraints for container, please
> check the yarn RM and NM logs to find out more details.
>
> Thanks
> Saisai
>
> On Fri, Oct 21, 2016 at 8:14 AM, Xi Shen  wrote:
>>
>> 16/10/20 18:12:14 ERROR cluster.YarnClientSchedulerBackend: Yarn
>> application has already exited with state FINISHED!
>>
>>  From this, I think it is spark has difficult communicating with YARN. You
>> should check your Spark log.
>>
>>
>> On Fri, Oct 21, 2016 at 8:06 AM Li Li  wrote:
>>>
>>> which log file should I
>>>
>>> On Thu, Oct 20, 2016 at 10:02 PM, Saisai Shao 
>>> wrote:
>>> > Looks like ApplicationMaster is killed by SIGTERM.
>>> >
>>> > 16/10/20 18:12:04 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL TERM
>>> > 16/10/20 18:12:04 INFO yarn.ApplicationMaster: Final app status:
>>> >
>>> > This container may be killed by yarn NodeManager or other processes,
>>> > you'd
>>> > better check yarn log to dig out more details.
>>> >
>>> > Thanks
>>> > Saisai
>>> >
>>> > On Thu, Oct 20, 2016 at 6:51 PM, Li Li  wrote:
>>> >>
>>> >> I am setting up a small yarn/spark cluster. hadoop/yarn version is
>>> >> 2.7.3 and I can run wordcount map-reduce correctly in yarn.
>>> >> And I am using  spark-2.0.1-bin-hadoop2.7 using command:
>>> >> ~/spark-2.0.1-bin-hadoop2.7$ ./bin/spark-submit --class
>>> >> org.apache.spark.examples.SparkPi --master yarn-client
>>> >> examples/jars/spark-examples_2.11-2.0.1.jar 1
>>> >> it fails and the first error is:
>>> >> 16/10/20 18:12:03 INFO storage.BlockManagerMaster: Registered
>>> >> BlockManager BlockManagerId(driver, 10.161.219.189, 39161)
>>> >> 16/10/20 18:12:03 INFO handler.ContextHandler: Started
>>> >> o.s.j.s.ServletContextHandler@76ad6715{/metrics/json,null,AVAILABLE}
>>> >> 16/10/20 18:12:12 INFO
>>> >> cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster
>>> >> registered as NettyRpcEndpointRef(null)
>>> >> 16/10/20 18:12:12 INFO cluster.YarnClientSchedulerBackend: Add WebUI
>>> >> Filter. org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter,
>>> >> Map(PROXY_HOSTS -> ai-hz1-spark1, PROXY_URI_BASES ->
>>> >> http://ai-hz1-spark1:8088/proxy/application_1476957324184_0002),
>>> >> /proxy/application_1476957324184_0002
>>> >> 16/10/20 18:12:12 INFO ui.JettyUtils: Adding filter:
>>> >> org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
>>> >> 16/10/20 18:12:12 INFO cluster.YarnClientSchedulerBackend:
>>> >> SchedulerBackend is ready for scheduling beginning after waiting
>>> >> maxRegisteredResourcesWaitingTime: 3(ms)
>>> >> 16/10/20 18:12:12 WARN spark.SparkContext: Use an existing
>>> >> SparkContext, some configuration may not take effect.
>>> >> 16/10/20 18:12:12 INFO handler.ContextHandler: Started
>>> >> o.s.j.s.ServletContextHandler@489091bd{/SQL,null,AVAILABLE}
>>> >> 16/10/20 18:12:12 INFO handler.ContextHandler: Started
>>> >> o.s.j.s.ServletContextHandler@1de9b505{/SQL/json,null,AVAILABLE}
>>> >> 16/10/20 18:12:12 INFO handler.ContextHandler: Started
>>> >> o.s.j.s.ServletContextHandler@378f002a{/SQL/execution,null,AVAILABLE}
>>> >> 16/10/20 18:12:12 INFO handler.ContextHandler: Started
>>> >>
>>> >> o.s.j.s.ServletContextHandler@2cc75074{/SQL/execution/json,null,AVAILABLE}
>>> >> 16/10/20 18:12:12 INFO handler.ContextHandler: Started
>>> >> o.s.j.s.ServletContextHandler@2d64160c{/static/sql,null,AVAILABLE}
>>> >> 16/10/20 18:12:12 INFO internal.SharedState: Warehouse path is
>>> >> '/home/hadoop/spark-2.0.1-bin-hadoop2.7/spark-warehouse'.
>>> >> 16/10/20 18:12:13 INFO spark.SparkContext: Starting job: reduce at
>>> >> SparkPi.scala:38
>>> >> 16/10/20 18:12:13 INFO scheduler.DAGScheduler: Got job 0 (reduce at
>>> >> SparkPi.scala:38) with 1 output partitions
>>> >> 16/10/20 18:12:13 INFO scheduler.DAGScheduler: Final stage:
>>> >> ResultStage 0 (reduce at SparkPi.scala:38)
>>> >> 16/10/20 18:12:13 INFO scheduler.DAGScheduler: Parents of final stage:
>>> >> List()
>>> >> 16/10/20 18:12:13 INFO scheduler.DAGScheduler: Missing parents: 

Re: spark pi example fail on yarn

2016-10-20 Thread Saisai Shao
It is not Spark has difficulty to communicate with YARN, it simply means AM
is exited with FINISHED state.

I'm guessing it might be related to memory constraints for container,
please check the yarn RM and NM logs to find out more details.

Thanks
Saisai

On Fri, Oct 21, 2016 at 8:14 AM, Xi Shen  wrote:

> 16/10/20 18:12:14 ERROR cluster.YarnClientSchedulerBackend: Yarn
> application has already exited with state FINISHED!
>
>  From this, I think it is spark has difficult communicating with YARN. You
> should check your Spark log.
>
>
> On Fri, Oct 21, 2016 at 8:06 AM Li Li  wrote:
>
> which log file should I
>
> On Thu, Oct 20, 2016 at 10:02 PM, Saisai Shao 
> wrote:
> > Looks like ApplicationMaster is killed by SIGTERM.
> >
> > 16/10/20 18:12:04 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL TERM
> > 16/10/20 18:12:04 INFO yarn.ApplicationMaster: Final app status:
> >
> > This container may be killed by yarn NodeManager or other processes,
> you'd
> > better check yarn log to dig out more details.
> >
> > Thanks
> > Saisai
> >
> > On Thu, Oct 20, 2016 at 6:51 PM, Li Li  wrote:
> >>
> >> I am setting up a small yarn/spark cluster. hadoop/yarn version is
> >> 2.7.3 and I can run wordcount map-reduce correctly in yarn.
> >> And I am using  spark-2.0.1-bin-hadoop2.7 using command:
> >> ~/spark-2.0.1-bin-hadoop2.7$ ./bin/spark-submit --class
> >> org.apache.spark.examples.SparkPi --master yarn-client
> >> examples/jars/spark-examples_2.11-2.0.1.jar 1
> >> it fails and the first error is:
> >> 16/10/20 18:12:03 INFO storage.BlockManagerMaster: Registered
> >> BlockManager BlockManagerId(driver, 10.161.219.189, 39161)
> >> 16/10/20 18:12:03 INFO handler.ContextHandler: Started
> >> o.s.j.s.ServletContextHandler@76ad6715{/metrics/json,null,AVAILABLE}
> >> 16/10/20 18:12:12 INFO
> >> cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster
> >> registered as NettyRpcEndpointRef(null)
> >> 16/10/20 18:12:12 INFO cluster.YarnClientSchedulerBackend: Add WebUI
> >> Filter. org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter,
> >> Map(PROXY_HOSTS -> ai-hz1-spark1, PROXY_URI_BASES ->
> >> http://ai-hz1-spark1:8088/proxy/application_1476957324184_0002),
> >> /proxy/application_1476957324184_0002
> >> 16/10/20 18:12:12 INFO ui.JettyUtils: Adding filter:
> >> org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
> >> 16/10/20 18:12:12 INFO cluster.YarnClientSchedulerBackend:
> >> SchedulerBackend is ready for scheduling beginning after waiting
> >> maxRegisteredResourcesWaitingTime: 3(ms)
> >> 16/10/20 18:12:12 WARN spark.SparkContext: Use an existing
> >> SparkContext, some configuration may not take effect.
> >> 16/10/20 18:12:12 INFO handler.ContextHandler: Started
> >> o.s.j.s.ServletContextHandler@489091bd{/SQL,null,AVAILABLE}
> >> 16/10/20 18:12:12 INFO handler.ContextHandler: Started
> >> o.s.j.s.ServletContextHandler@1de9b505{/SQL/json,null,AVAILABLE}
> >> 16/10/20 18:12:12 INFO handler.ContextHandler: Started
> >> o.s.j.s.ServletContextHandler@378f002a{/SQL/execution,null,AVAILABLE}
> >> 16/10/20 18:12:12 INFO handler.ContextHandler: Started
> >> o.s.j.s.ServletContextHandler@2cc75074{/SQL/execution/json,
> null,AVAILABLE}
> >> 16/10/20 18:12:12 INFO handler.ContextHandler: Started
> >> o.s.j.s.ServletContextHandler@2d64160c{/static/sql,null,AVAILABLE}
> >> 16/10/20 18:12:12 INFO internal.SharedState: Warehouse path is
> >> '/home/hadoop/spark-2.0.1-bin-hadoop2.7/spark-warehouse'.
> >> 16/10/20 18:12:13 INFO spark.SparkContext: Starting job: reduce at
> >> SparkPi.scala:38
> >> 16/10/20 18:12:13 INFO scheduler.DAGScheduler: Got job 0 (reduce at
> >> SparkPi.scala:38) with 1 output partitions
> >> 16/10/20 18:12:13 INFO scheduler.DAGScheduler: Final stage:
> >> ResultStage 0 (reduce at SparkPi.scala:38)
> >> 16/10/20 18:12:13 INFO scheduler.DAGScheduler: Parents of final stage:
> >> List()
> >> 16/10/20 18:12:13 INFO scheduler.DAGScheduler: Missing parents: List()
> >> 16/10/20 18:12:13 INFO scheduler.DAGScheduler: Submitting ResultStage
> >> 0 (MapPartitionsRDD[1] at map at SparkPi.scala:34), which has no
> >> missing parents
> >> 16/10/20 18:12:13 INFO memory.MemoryStore: Block broadcast_0 stored as
> >> values in memory (estimated size 1832.0 B, free 366.3 MB)
> >> 16/10/20 18:12:13 INFO memory.MemoryStore: Block broadcast_0_piece0
> >> stored as bytes in memory (estimated size 1169.0 B, free 366.3 MB)
> >> 16/10/20 18:12:13 INFO storage.BlockManagerInfo: Added
> >> broadcast_0_piece0 in memory on 10.161.219.189:39161 (size: 1169.0 B,
> >> free: 366.3 MB)
> >> 16/10/20 18:12:13 INFO spark.SparkContext: Created broadcast 0 from
> >> broadcast at DAGScheduler.scala:1012
> >> 16/10/20 18:12:13 INFO scheduler.DAGScheduler: Submitting 1
> >> missing tasks from ResultStage 0 (MapPartitionsRDD[1] at map at
> >> SparkPi.scala:34)
> >> 16/10/20 18:12:13 INFO cluster.YarnScheduler: Adding 

Re: spark pi example fail on yarn

2016-10-20 Thread Xi Shen
16/10/20 18:12:14 ERROR cluster.YarnClientSchedulerBackend: Yarn
application has already exited with state FINISHED!

 From this, I think it is spark has difficult communicating with YARN. You
should check your Spark log.


On Fri, Oct 21, 2016 at 8:06 AM Li Li  wrote:

which log file should I

On Thu, Oct 20, 2016 at 10:02 PM, Saisai Shao 
wrote:
> Looks like ApplicationMaster is killed by SIGTERM.
>
> 16/10/20 18:12:04 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL TERM
> 16/10/20 18:12:04 INFO yarn.ApplicationMaster: Final app status:
>
> This container may be killed by yarn NodeManager or other processes, you'd
> better check yarn log to dig out more details.
>
> Thanks
> Saisai
>
> On Thu, Oct 20, 2016 at 6:51 PM, Li Li  wrote:
>>
>> I am setting up a small yarn/spark cluster. hadoop/yarn version is
>> 2.7.3 and I can run wordcount map-reduce correctly in yarn.
>> And I am using  spark-2.0.1-bin-hadoop2.7 using command:
>> ~/spark-2.0.1-bin-hadoop2.7$ ./bin/spark-submit --class
>> org.apache.spark.examples.SparkPi --master yarn-client
>> examples/jars/spark-examples_2.11-2.0.1.jar 1
>> it fails and the first error is:
>> 16/10/20 18:12:03 INFO storage.BlockManagerMaster: Registered
>> BlockManager BlockManagerId(driver, 10.161.219.189, 39161)
>> 16/10/20 18:12:03 INFO handler.ContextHandler: Started
>> o.s.j.s.ServletContextHandler@76ad6715{/metrics/json,null,AVAILABLE}
>> 16/10/20 18:12:12 INFO
>> cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster
>> registered as NettyRpcEndpointRef(null)
>> 16/10/20 18:12:12 INFO cluster.YarnClientSchedulerBackend: Add WebUI
>> Filter. org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter,
>> Map(PROXY_HOSTS -> ai-hz1-spark1, PROXY_URI_BASES ->
>> http://ai-hz1-spark1:8088/proxy/application_1476957324184_0002),
>> /proxy/application_1476957324184_0002
>> 16/10/20 18:12:12 INFO ui.JettyUtils: Adding filter:
>> org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
>> 16/10/20 18:12:12 INFO cluster.YarnClientSchedulerBackend:
>> SchedulerBackend is ready for scheduling beginning after waiting
>> maxRegisteredResourcesWaitingTime: 3(ms)
>> 16/10/20 18:12:12 WARN spark.SparkContext: Use an existing
>> SparkContext, some configuration may not take effect.
>> 16/10/20 18:12:12 INFO handler.ContextHandler: Started
>> o.s.j.s.ServletContextHandler@489091bd{/SQL,null,AVAILABLE}
>> 16/10/20 18:12:12 INFO handler.ContextHandler: Started
>> o.s.j.s.ServletContextHandler@1de9b505{/SQL/json,null,AVAILABLE}
>> 16/10/20 18:12:12 INFO handler.ContextHandler: Started
>> o.s.j.s.ServletContextHandler@378f002a{/SQL/execution,null,AVAILABLE}
>> 16/10/20 18:12:12 INFO handler.ContextHandler: Started
>> o.s.j.s.ServletContextHandler@2cc75074
{/SQL/execution/json,null,AVAILABLE}
>> 16/10/20 18:12:12 INFO handler.ContextHandler: Started
>> o.s.j.s.ServletContextHandler@2d64160c{/static/sql,null,AVAILABLE}
>> 16/10/20 18:12:12 INFO internal.SharedState: Warehouse path is
>> '/home/hadoop/spark-2.0.1-bin-hadoop2.7/spark-warehouse'.
>> 16/10/20 18:12:13 INFO spark.SparkContext: Starting job: reduce at
>> SparkPi.scala:38
>> 16/10/20 18:12:13 INFO scheduler.DAGScheduler: Got job 0 (reduce at
>> SparkPi.scala:38) with 1 output partitions
>> 16/10/20 18:12:13 INFO scheduler.DAGScheduler: Final stage:
>> ResultStage 0 (reduce at SparkPi.scala:38)
>> 16/10/20 18:12:13 INFO scheduler.DAGScheduler: Parents of final stage:
>> List()
>> 16/10/20 18:12:13 INFO scheduler.DAGScheduler: Missing parents: List()
>> 16/10/20 18:12:13 INFO scheduler.DAGScheduler: Submitting ResultStage
>> 0 (MapPartitionsRDD[1] at map at SparkPi.scala:34), which has no
>> missing parents
>> 16/10/20 18:12:13 INFO memory.MemoryStore: Block broadcast_0 stored as
>> values in memory (estimated size 1832.0 B, free 366.3 MB)
>> 16/10/20 18:12:13 INFO memory.MemoryStore: Block broadcast_0_piece0
>> stored as bytes in memory (estimated size 1169.0 B, free 366.3 MB)
>> 16/10/20 18:12:13 INFO storage.BlockManagerInfo: Added
>> broadcast_0_piece0 in memory on 10.161.219.189:39161 (size: 1169.0 B,
>> free: 366.3 MB)
>> 16/10/20 18:12:13 INFO spark.SparkContext: Created broadcast 0 from
>> broadcast at DAGScheduler.scala:1012
>> 16/10/20 18:12:13 INFO scheduler.DAGScheduler: Submitting 1
>> missing tasks from ResultStage 0 (MapPartitionsRDD[1] at map at
>> SparkPi.scala:34)
>> 16/10/20 18:12:13 INFO cluster.YarnScheduler: Adding task set 0.0 with
>> 1 tasks
>> 16/10/20 18:12:14 ERROR cluster.YarnClientSchedulerBackend: Yarn
>> application has already exited with state FINISHED!
>> 16/10/20 18:12:14 INFO server.ServerConnector: Stopped
>> ServerConnector@389adf1d{HTTP/1.1}{0.0.0.0:4040}
>> 16/10/20 18:12:14 INFO handler.ContextHandler: Stopped
>> o.s.j.s.ServletContextHandler@841e575
{/stages/stage/kill,null,UNAVAILABLE}
>> 16/10/20 18:12:14 INFO handler.ContextHandler: Stopped
>> 

[Spark ML] Using GBTClassifier in OneVsRest

2016-10-20 Thread ansari
It appears as if the inheritance hierarchy doesn't allow GBTClassifiers to be
used as the binary classifier in a OneVsRest trainer. Is there a simple way
to use gradient-boosted trees for multiclass (not binary) problems?

Specifically, it complains that GBTClassifier doesn't inherit from
Classifier[_, _, _].

I'm using Spark 2.0.1:

val gbt = new GBTClassifier()
  .setLabelCol("indexedLabel")
  .setFeaturesCol("features")
  .setMaxIter(10)
  .setMaxDepth(10)
 
val ovr = new OneVsRest().
setClassifier(gbt)

fails saying

error: type mismatch;
 found   : org.apache.spark.ml.classification.GBTClassifier
 required: org.apache.spark.ml.classification.Classifier[_, _, _]
   setClassifier(gbt)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-ML-Using-GBTClassifier-in-OneVsRest-tp27933.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: spark pi example fail on yarn

2016-10-20 Thread Li Li
which log file should I

On Thu, Oct 20, 2016 at 10:02 PM, Saisai Shao  wrote:
> Looks like ApplicationMaster is killed by SIGTERM.
>
> 16/10/20 18:12:04 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL TERM
> 16/10/20 18:12:04 INFO yarn.ApplicationMaster: Final app status:
>
> This container may be killed by yarn NodeManager or other processes, you'd
> better check yarn log to dig out more details.
>
> Thanks
> Saisai
>
> On Thu, Oct 20, 2016 at 6:51 PM, Li Li  wrote:
>>
>> I am setting up a small yarn/spark cluster. hadoop/yarn version is
>> 2.7.3 and I can run wordcount map-reduce correctly in yarn.
>> And I am using  spark-2.0.1-bin-hadoop2.7 using command:
>> ~/spark-2.0.1-bin-hadoop2.7$ ./bin/spark-submit --class
>> org.apache.spark.examples.SparkPi --master yarn-client
>> examples/jars/spark-examples_2.11-2.0.1.jar 1
>> it fails and the first error is:
>> 16/10/20 18:12:03 INFO storage.BlockManagerMaster: Registered
>> BlockManager BlockManagerId(driver, 10.161.219.189, 39161)
>> 16/10/20 18:12:03 INFO handler.ContextHandler: Started
>> o.s.j.s.ServletContextHandler@76ad6715{/metrics/json,null,AVAILABLE}
>> 16/10/20 18:12:12 INFO
>> cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster
>> registered as NettyRpcEndpointRef(null)
>> 16/10/20 18:12:12 INFO cluster.YarnClientSchedulerBackend: Add WebUI
>> Filter. org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter,
>> Map(PROXY_HOSTS -> ai-hz1-spark1, PROXY_URI_BASES ->
>> http://ai-hz1-spark1:8088/proxy/application_1476957324184_0002),
>> /proxy/application_1476957324184_0002
>> 16/10/20 18:12:12 INFO ui.JettyUtils: Adding filter:
>> org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
>> 16/10/20 18:12:12 INFO cluster.YarnClientSchedulerBackend:
>> SchedulerBackend is ready for scheduling beginning after waiting
>> maxRegisteredResourcesWaitingTime: 3(ms)
>> 16/10/20 18:12:12 WARN spark.SparkContext: Use an existing
>> SparkContext, some configuration may not take effect.
>> 16/10/20 18:12:12 INFO handler.ContextHandler: Started
>> o.s.j.s.ServletContextHandler@489091bd{/SQL,null,AVAILABLE}
>> 16/10/20 18:12:12 INFO handler.ContextHandler: Started
>> o.s.j.s.ServletContextHandler@1de9b505{/SQL/json,null,AVAILABLE}
>> 16/10/20 18:12:12 INFO handler.ContextHandler: Started
>> o.s.j.s.ServletContextHandler@378f002a{/SQL/execution,null,AVAILABLE}
>> 16/10/20 18:12:12 INFO handler.ContextHandler: Started
>> o.s.j.s.ServletContextHandler@2cc75074{/SQL/execution/json,null,AVAILABLE}
>> 16/10/20 18:12:12 INFO handler.ContextHandler: Started
>> o.s.j.s.ServletContextHandler@2d64160c{/static/sql,null,AVAILABLE}
>> 16/10/20 18:12:12 INFO internal.SharedState: Warehouse path is
>> '/home/hadoop/spark-2.0.1-bin-hadoop2.7/spark-warehouse'.
>> 16/10/20 18:12:13 INFO spark.SparkContext: Starting job: reduce at
>> SparkPi.scala:38
>> 16/10/20 18:12:13 INFO scheduler.DAGScheduler: Got job 0 (reduce at
>> SparkPi.scala:38) with 1 output partitions
>> 16/10/20 18:12:13 INFO scheduler.DAGScheduler: Final stage:
>> ResultStage 0 (reduce at SparkPi.scala:38)
>> 16/10/20 18:12:13 INFO scheduler.DAGScheduler: Parents of final stage:
>> List()
>> 16/10/20 18:12:13 INFO scheduler.DAGScheduler: Missing parents: List()
>> 16/10/20 18:12:13 INFO scheduler.DAGScheduler: Submitting ResultStage
>> 0 (MapPartitionsRDD[1] at map at SparkPi.scala:34), which has no
>> missing parents
>> 16/10/20 18:12:13 INFO memory.MemoryStore: Block broadcast_0 stored as
>> values in memory (estimated size 1832.0 B, free 366.3 MB)
>> 16/10/20 18:12:13 INFO memory.MemoryStore: Block broadcast_0_piece0
>> stored as bytes in memory (estimated size 1169.0 B, free 366.3 MB)
>> 16/10/20 18:12:13 INFO storage.BlockManagerInfo: Added
>> broadcast_0_piece0 in memory on 10.161.219.189:39161 (size: 1169.0 B,
>> free: 366.3 MB)
>> 16/10/20 18:12:13 INFO spark.SparkContext: Created broadcast 0 from
>> broadcast at DAGScheduler.scala:1012
>> 16/10/20 18:12:13 INFO scheduler.DAGScheduler: Submitting 1
>> missing tasks from ResultStage 0 (MapPartitionsRDD[1] at map at
>> SparkPi.scala:34)
>> 16/10/20 18:12:13 INFO cluster.YarnScheduler: Adding task set 0.0 with
>> 1 tasks
>> 16/10/20 18:12:14 ERROR cluster.YarnClientSchedulerBackend: Yarn
>> application has already exited with state FINISHED!
>> 16/10/20 18:12:14 INFO server.ServerConnector: Stopped
>> ServerConnector@389adf1d{HTTP/1.1}{0.0.0.0:4040}
>> 16/10/20 18:12:14 INFO handler.ContextHandler: Stopped
>> o.s.j.s.ServletContextHandler@841e575{/stages/stage/kill,null,UNAVAILABLE}
>> 16/10/20 18:12:14 INFO handler.ContextHandler: Stopped
>> o.s.j.s.ServletContextHandler@66629f63{/api,null,UNAVAILABLE}
>> 16/10/20 18:12:14 INFO handler.ContextHandler: Stopped
>> o.s.j.s.ServletContextHandler@2b62442c{/,null,UNAVAILABLE}
>>
>>
>> I also use yarn log to get logs from yarn(total log is very lengthy in
>> attachement):
>> 16/10/20 18:12:03 INFO yarn.ExecutorRunnable:
>>
>> 

Re: spark pi example fail on yarn

2016-10-20 Thread Li Li
which log file should I check?

On Thu, Oct 20, 2016 at 11:32 PM, Amit Tank
 wrote:
> I recently started learning spark so I may be completely wrong here but I
> was facing similar problem with sparkpi on yarn. After changing yarn to
> cluster mode it worked perfectly fine.
>
> Thank you,
> Amit
>
>
> On Thursday, October 20, 2016, Saisai Shao  wrote:
>>
>> Looks like ApplicationMaster is killed by SIGTERM.
>>
>> 16/10/20 18:12:04 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL TERM
>> 16/10/20 18:12:04 INFO yarn.ApplicationMaster: Final app status:
>>
>> This container may be killed by yarn NodeManager or other processes, you'd
>> better check yarn log to dig out more details.
>>
>> Thanks
>> Saisai
>>
>> On Thu, Oct 20, 2016 at 6:51 PM, Li Li  wrote:
>>>
>>> I am setting up a small yarn/spark cluster. hadoop/yarn version is
>>> 2.7.3 and I can run wordcount map-reduce correctly in yarn.
>>> And I am using  spark-2.0.1-bin-hadoop2.7 using command:
>>> ~/spark-2.0.1-bin-hadoop2.7$ ./bin/spark-submit --class
>>> org.apache.spark.examples.SparkPi --master yarn-client
>>> examples/jars/spark-examples_2.11-2.0.1.jar 1
>>> it fails and the first error is:
>>> 16/10/20 18:12:03 INFO storage.BlockManagerMaster: Registered
>>> BlockManager BlockManagerId(driver, 10.161.219.189, 39161)
>>> 16/10/20 18:12:03 INFO handler.ContextHandler: Started
>>> o.s.j.s.ServletContextHandler@76ad6715{/metrics/json,null,AVAILABLE}
>>> 16/10/20 18:12:12 INFO
>>> cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster
>>> registered as NettyRpcEndpointRef(null)
>>> 16/10/20 18:12:12 INFO cluster.YarnClientSchedulerBackend: Add WebUI
>>> Filter. org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter,
>>> Map(PROXY_HOSTS -> ai-hz1-spark1, PROXY_URI_BASES ->
>>> http://ai-hz1-spark1:8088/proxy/application_1476957324184_0002),
>>> /proxy/application_1476957324184_0002
>>> 16/10/20 18:12:12 INFO ui.JettyUtils: Adding filter:
>>> org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
>>> 16/10/20 18:12:12 INFO cluster.YarnClientSchedulerBackend:
>>> SchedulerBackend is ready for scheduling beginning after waiting
>>> maxRegisteredResourcesWaitingTime: 3(ms)
>>> 16/10/20 18:12:12 WARN spark.SparkContext: Use an existing
>>> SparkContext, some configuration may not take effect.
>>> 16/10/20 18:12:12 INFO handler.ContextHandler: Started
>>> o.s.j.s.ServletContextHandler@489091bd{/SQL,null,AVAILABLE}
>>> 16/10/20 18:12:12 INFO handler.ContextHandler: Started
>>> o.s.j.s.ServletContextHandler@1de9b505{/SQL/json,null,AVAILABLE}
>>> 16/10/20 18:12:12 INFO handler.ContextHandler: Started
>>> o.s.j.s.ServletContextHandler@378f002a{/SQL/execution,null,AVAILABLE}
>>> 16/10/20 18:12:12 INFO handler.ContextHandler: Started
>>>
>>> o.s.j.s.ServletContextHandler@2cc75074{/SQL/execution/json,null,AVAILABLE}
>>> 16/10/20 18:12:12 INFO handler.ContextHandler: Started
>>> o.s.j.s.ServletContextHandler@2d64160c{/static/sql,null,AVAILABLE}
>>> 16/10/20 18:12:12 INFO internal.SharedState: Warehouse path is
>>> '/home/hadoop/spark-2.0.1-bin-hadoop2.7/spark-warehouse'.
>>> 16/10/20 18:12:13 INFO spark.SparkContext: Starting job: reduce at
>>> SparkPi.scala:38
>>> 16/10/20 18:12:13 INFO scheduler.DAGScheduler: Got job 0 (reduce at
>>> SparkPi.scala:38) with 1 output partitions
>>> 16/10/20 18:12:13 INFO scheduler.DAGScheduler: Final stage:
>>> ResultStage 0 (reduce at SparkPi.scala:38)
>>> 16/10/20 18:12:13 INFO scheduler.DAGScheduler: Parents of final stage:
>>> List()
>>> 16/10/20 18:12:13 INFO scheduler.DAGScheduler: Missing parents: List()
>>> 16/10/20 18:12:13 INFO scheduler.DAGScheduler: Submitting ResultStage
>>> 0 (MapPartitionsRDD[1] at map at SparkPi.scala:34), which has no
>>> missing parents
>>> 16/10/20 18:12:13 INFO memory.MemoryStore: Block broadcast_0 stored as
>>> values in memory (estimated size 1832.0 B, free 366.3 MB)
>>> 16/10/20 18:12:13 INFO memory.MemoryStore: Block broadcast_0_piece0
>>> stored as bytes in memory (estimated size 1169.0 B, free 366.3 MB)
>>> 16/10/20 18:12:13 INFO storage.BlockManagerInfo: Added
>>> broadcast_0_piece0 in memory on 10.161.219.189:39161 (size: 1169.0 B,
>>> free: 366.3 MB)
>>> 16/10/20 18:12:13 INFO spark.SparkContext: Created broadcast 0 from
>>> broadcast at DAGScheduler.scala:1012
>>> 16/10/20 18:12:13 INFO scheduler.DAGScheduler: Submitting 1
>>> missing tasks from ResultStage 0 (MapPartitionsRDD[1] at map at
>>> SparkPi.scala:34)
>>> 16/10/20 18:12:13 INFO cluster.YarnScheduler: Adding task set 0.0 with
>>> 1 tasks
>>> 16/10/20 18:12:14 ERROR cluster.YarnClientSchedulerBackend: Yarn
>>> application has already exited with state FINISHED!
>>> 16/10/20 18:12:14 INFO server.ServerConnector: Stopped
>>> ServerConnector@389adf1d{HTTP/1.1}{0.0.0.0:4040}
>>> 16/10/20 18:12:14 INFO handler.ContextHandler: Stopped
>>>
>>> o.s.j.s.ServletContextHandler@841e575{/stages/stage/kill,null,UNAVAILABLE}

Re: spark pi example fail on yarn

2016-10-20 Thread Li Li
yes, when I use yarn-cluster mode, it's correct. What's wrong with
yarn-client? the spark shell is also not work because it's client
mode. Any solution for this?

On Thu, Oct 20, 2016 at 11:32 PM, Amit Tank
 wrote:
> I recently started learning spark so I may be completely wrong here but I
> was facing similar problem with sparkpi on yarn. After changing yarn to
> cluster mode it worked perfectly fine.
>
> Thank you,
> Amit
>
>
> On Thursday, October 20, 2016, Saisai Shao  wrote:
>>
>> Looks like ApplicationMaster is killed by SIGTERM.
>>
>> 16/10/20 18:12:04 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL TERM
>> 16/10/20 18:12:04 INFO yarn.ApplicationMaster: Final app status:
>>
>> This container may be killed by yarn NodeManager or other processes, you'd
>> better check yarn log to dig out more details.
>>
>> Thanks
>> Saisai
>>
>> On Thu, Oct 20, 2016 at 6:51 PM, Li Li  wrote:
>>>
>>> I am setting up a small yarn/spark cluster. hadoop/yarn version is
>>> 2.7.3 and I can run wordcount map-reduce correctly in yarn.
>>> And I am using  spark-2.0.1-bin-hadoop2.7 using command:
>>> ~/spark-2.0.1-bin-hadoop2.7$ ./bin/spark-submit --class
>>> org.apache.spark.examples.SparkPi --master yarn-client
>>> examples/jars/spark-examples_2.11-2.0.1.jar 1
>>> it fails and the first error is:
>>> 16/10/20 18:12:03 INFO storage.BlockManagerMaster: Registered
>>> BlockManager BlockManagerId(driver, 10.161.219.189, 39161)
>>> 16/10/20 18:12:03 INFO handler.ContextHandler: Started
>>> o.s.j.s.ServletContextHandler@76ad6715{/metrics/json,null,AVAILABLE}
>>> 16/10/20 18:12:12 INFO
>>> cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster
>>> registered as NettyRpcEndpointRef(null)
>>> 16/10/20 18:12:12 INFO cluster.YarnClientSchedulerBackend: Add WebUI
>>> Filter. org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter,
>>> Map(PROXY_HOSTS -> ai-hz1-spark1, PROXY_URI_BASES ->
>>> http://ai-hz1-spark1:8088/proxy/application_1476957324184_0002),
>>> /proxy/application_1476957324184_0002
>>> 16/10/20 18:12:12 INFO ui.JettyUtils: Adding filter:
>>> org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
>>> 16/10/20 18:12:12 INFO cluster.YarnClientSchedulerBackend:
>>> SchedulerBackend is ready for scheduling beginning after waiting
>>> maxRegisteredResourcesWaitingTime: 3(ms)
>>> 16/10/20 18:12:12 WARN spark.SparkContext: Use an existing
>>> SparkContext, some configuration may not take effect.
>>> 16/10/20 18:12:12 INFO handler.ContextHandler: Started
>>> o.s.j.s.ServletContextHandler@489091bd{/SQL,null,AVAILABLE}
>>> 16/10/20 18:12:12 INFO handler.ContextHandler: Started
>>> o.s.j.s.ServletContextHandler@1de9b505{/SQL/json,null,AVAILABLE}
>>> 16/10/20 18:12:12 INFO handler.ContextHandler: Started
>>> o.s.j.s.ServletContextHandler@378f002a{/SQL/execution,null,AVAILABLE}
>>> 16/10/20 18:12:12 INFO handler.ContextHandler: Started
>>>
>>> o.s.j.s.ServletContextHandler@2cc75074{/SQL/execution/json,null,AVAILABLE}
>>> 16/10/20 18:12:12 INFO handler.ContextHandler: Started
>>> o.s.j.s.ServletContextHandler@2d64160c{/static/sql,null,AVAILABLE}
>>> 16/10/20 18:12:12 INFO internal.SharedState: Warehouse path is
>>> '/home/hadoop/spark-2.0.1-bin-hadoop2.7/spark-warehouse'.
>>> 16/10/20 18:12:13 INFO spark.SparkContext: Starting job: reduce at
>>> SparkPi.scala:38
>>> 16/10/20 18:12:13 INFO scheduler.DAGScheduler: Got job 0 (reduce at
>>> SparkPi.scala:38) with 1 output partitions
>>> 16/10/20 18:12:13 INFO scheduler.DAGScheduler: Final stage:
>>> ResultStage 0 (reduce at SparkPi.scala:38)
>>> 16/10/20 18:12:13 INFO scheduler.DAGScheduler: Parents of final stage:
>>> List()
>>> 16/10/20 18:12:13 INFO scheduler.DAGScheduler: Missing parents: List()
>>> 16/10/20 18:12:13 INFO scheduler.DAGScheduler: Submitting ResultStage
>>> 0 (MapPartitionsRDD[1] at map at SparkPi.scala:34), which has no
>>> missing parents
>>> 16/10/20 18:12:13 INFO memory.MemoryStore: Block broadcast_0 stored as
>>> values in memory (estimated size 1832.0 B, free 366.3 MB)
>>> 16/10/20 18:12:13 INFO memory.MemoryStore: Block broadcast_0_piece0
>>> stored as bytes in memory (estimated size 1169.0 B, free 366.3 MB)
>>> 16/10/20 18:12:13 INFO storage.BlockManagerInfo: Added
>>> broadcast_0_piece0 in memory on 10.161.219.189:39161 (size: 1169.0 B,
>>> free: 366.3 MB)
>>> 16/10/20 18:12:13 INFO spark.SparkContext: Created broadcast 0 from
>>> broadcast at DAGScheduler.scala:1012
>>> 16/10/20 18:12:13 INFO scheduler.DAGScheduler: Submitting 1
>>> missing tasks from ResultStage 0 (MapPartitionsRDD[1] at map at
>>> SparkPi.scala:34)
>>> 16/10/20 18:12:13 INFO cluster.YarnScheduler: Adding task set 0.0 with
>>> 1 tasks
>>> 16/10/20 18:12:14 ERROR cluster.YarnClientSchedulerBackend: Yarn
>>> application has already exited with state FINISHED!
>>> 16/10/20 18:12:14 INFO server.ServerConnector: Stopped
>>> ServerConnector@389adf1d{HTTP/1.1}{0.0.0.0:4040}
>>> 16/10/20 

Re: Equivalent Parquet File Repartitioning Benefits for Join/Shuffle?

2016-10-20 Thread adam kramer
I believe what I am looking for is DataFrameWriter.bucketBy which
would allow for bucketing into physical parquet files by the desired
columns. Then my question would be can DataFrame/Sets take advantage
of this physical bucketing upon read of the parquet file for something
like a self-join on the bucketed columns?

On Tue, Oct 18, 2016 at 10:59 PM, adam kramer  wrote:
> Hello All,
>
> I’m trying to improve join efficiency within (self-join) and across
> data sets loaded from different parquet files primarily due to a
> multi-stage data ingestion environment.
>
> Are there specific benefits to shuffling efficiency (e.g. no network
> transmission) if the parquet files are written from equivalently
> partitioned datasets (i.e. same partition columns and number of
> partitions)?
>
> A self-join and multi-join Scala shell example that uses the method in 
> question:
> % val df1 = 
> sqlContext.read.parquet("hdfs://someserver:9010/default-partitioned-a-z-file-1")
> % val df2 = 
> sqlContext.read.parquet("hdfs://someserver:9010/default-partitioned-a-z-file-2")
> % val df1_part = df1.repartition(500,$”a",$”b",$”c")
> % val df2_part = df2.repartition(500,$”a",$”b",$”c")
> % 
> df1_part.write.format("parquet").mode(SaveMode.Overwrite).save("hdfs://someserver:9010/a-b-c-partitioned-file-1”)
> % 
> df2_part.write.format("parquet").mode(SaveMode.Overwrite).save("hdfs://someserver:9010/a-b-c-partitioned-file-2”)
> % val reloaded_df1_part =
> sqlContext.read.parquet("hdfs://someserver:9010/a-b-c-partitioned-file-1”)
> % val reloaded_df2_part =
> sqlContext.read.parquet("hdfs://someserver:9010/a-b-c-partitioned-file-2”)
> % val superfast_self_join =
> reloaded_df1_part.join(reloaded_df1_part.select($”a”,$”b”,$”c”,$”d”.as(“right_d”)),
> Seq(“a”,”b”,”c”))
> % val superfast_multi_join =
> reloaded_df1_part.join(reloaded_df2_part.select($”a”,$”b”,$”c”,$”not_in_df1”),
> Seq(“a”,”b”,”c”))
> % superfast_self_join.count
> % superfast_multi_join.count
>
> Ignoring the time necessary to repartition and assuming good
> partitioning cardinality (while joining groups of rows), are there
> performance benefits to this approach for joins ‘superfast_self_join'
> and 'superfast_multi_join'? Or is there no benefit as the partitioner
> information is lost upon persistence/write to parquet?
>
> Note I am currently using Spark 1.6.3 and moving to 2.0.1 in the near future.
>
> Thank you for any insights.
>
> Adam

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark SQL parallelize

2016-10-20 Thread Selvam Raman
Hi,

I am having 40+ structured data stored in s3 bucket as parquet file .

I am going to use 20 table in the use case.

There s a Main table which drive the whole flow. Main table contains 1k
record.

My use case is for every record in the main table process the rest of
table( join group by depends on main table field).

How can I parallel the process.

What I done was read the main table and create tocaliterator for df then do
the rest of the processing.
This one run one by one record.

Please share me your ideas.

Thank you.


RDD groupBy() then random sort each group ?

2016-10-20 Thread Yang
in my application, I group by same training samples by their model_id's
 (the input table contains training samples for 100k different models),
then each group ends up having about 1 million training samples,

then I feed that group of samples to a little Logistic Regression solver
(SGD), but SGD requires the input data to be shuffled randomly (so that
positive and negative samples are evenly distributed), so now I do
something like

my_input_rdd.groupBy(x=>x.model_id).map(x=>
val (model_id, group_of_rows) = x

 (model_id, group_of_rows.toSeq().shuffle() )

).map(x=> (x._1, train_sgd(x._2))


the issue is that on the 3rd row above, I had to explicitly call toSeq() on
the group_of_rows in order to shuffle, which is an Iterable and not Seq.
now I have to load the entire 1 million rows into memory, and in practice
I've seen my tasks OOM and GC time goes crazy (about 50% of total run
time). I suspect this toSeq() is the reason, since doing a simple count()
on the groupBy() result works fine.

I am planning to shuffle the my_input_rdd first, and then groupBy(), and
not do the toSeq().shuffle(). intuitively the input rdd is already
shuffled, so UNLESS groupBy() tries to do some sorting, the rows in the
group SHOULD remain shuffled  but overall this remains rather flimsy.

any ideas to do this more reliably?

thanks!


Re: Spark 2.0 with Kafka 0.10 exception

2016-10-20 Thread Cody Koeninger
Right on, I put in a PR to make a note of that in the docs.

On Thu, Oct 20, 2016 at 12:13 PM, Srikanth  wrote:
> Yeah, setting those params helped.
>
> On Wed, Oct 19, 2016 at 1:32 PM, Cody Koeninger  wrote:
>>
>> 60 seconds for a batch is above the default settings in kafka related
>> to heartbeat timeouts, so that might be related.  Have you tried
>> tweaking session.timeout.ms, heartbeat.interval.ms, or related
>> configs?
>>
>> On Wed, Oct 19, 2016 at 12:22 PM, Srikanth  wrote:
>> > Bringing this thread back as I'm seeing this exception on a production
>> > kafka
>> > cluster.
>> >
>> > I have two Spark streaming apps reading the same topic. App1 has batch
>> > interval 2secs and app2 has 60secs.
>> > Both apps are running on the same cluster on similar hardware. I see
>> > this
>> > exception only in app2 and fairly consistently.
>> >
>> > Difference I see between the apps is
>> > App1
>> >   spark.streaming.kafka.maxRatePerPartition, 6000
>> >   batch interval 2 secs
>> > App2
>> >   spark.streaming.kafka.maxRatePerPartition, 1
>> >   batch interval 60 secs
>> >
>> > All other kafka/spark related configs are same for both apps.
>> >   spark.streaming.kafka.consumer.poll.ms = 4096
>> >   spark.streaming.backpressure.enabled = true
>> >
>> > Not sure if pre-fetching or caching is messing things up.
>> >
>> > 16/10/19 14:32:04 WARN TaskSetManager: Lost task 2.0 in stage 1780.0
>> > (TID
>> > 12541, ip-10-150-20-200.ec2.internal): java.lang.AssertionError:
>> > assertion
>> > failed: Failed to get records for spark-executor-StreamingEventSplitProd
>> > mt_event 6 49091480 after polling for 4096
>> > at scala.Predef$.assert(Predef.scala:170)
>> > at
>> >
>> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
>> > at
>> >
>> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
>> > at
>> >
>> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
>> > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>> > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>> > at scala.collection.Iterator$$anon$21.next(Iterator.scala:838)
>> >
>> >
>> > On Wed, Sep 7, 2016 at 3:55 PM, Cody Koeninger 
>> > wrote:
>> >>
>> >> That's not what I would have expected to happen with a lower cache
>> >> setting, but in general disabling the cache isn't something you want
>> >> to do with the new kafka consumer.
>> >>
>> >>
>> >> As far as the original issue, are you seeing those polling errors
>> >> intermittently, or consistently?  From your description, it sounds
>> >> like retry is working correctly.
>> >>
>> >>
>> >> On Wed, Sep 7, 2016 at 2:37 PM, Srikanth  wrote:
>> >> > Setting those two results in below exception.
>> >> > No.of executors < no.of partitions. Could that be triggering this?
>> >> >
>> >> > 16/09/07 15:33:13 ERROR Executor: Exception in task 2.0 in stage 2.0
>> >> > (TID 9)
>> >> > java.util.ConcurrentModificationException: KafkaConsumer is not safe
>> >> > for
>> >> > multi-threaded access
>> >> > at
>> >> >
>> >> >
>> >> > org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1430)
>> >> > at
>> >> >
>> >> >
>> >> > org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1360)
>> >> > at
>> >> >
>> >> >
>> >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$$anon$1.removeEldestEntry(CachedKafkaConsumer.scala:128)
>> >> > at java.util.LinkedHashMap.afterNodeInsertion(Unknown Source)
>> >> > at java.util.HashMap.putVal(Unknown Source)
>> >> > at java.util.HashMap.put(Unknown Source)
>> >> > at
>> >> >
>> >> >
>> >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.get(CachedKafkaConsumer.scala:158)
>> >> > at
>> >> >
>> >> >
>> >> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.(KafkaRDD.scala:210)
>> >> > at
>> >> >
>> >> > org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRDD.scala:185)
>> >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>> >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>> >> > at
>> >> >
>> >> > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>> >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>> >> > at
>> >> >
>> >> > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>> >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>> >> > at
>> >> >
>> >> > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>> >> > at 

Re: Spark 2.0 with Kafka 0.10 exception

2016-10-20 Thread Srikanth
Yeah, setting those params helped.

On Wed, Oct 19, 2016 at 1:32 PM, Cody Koeninger  wrote:

> 60 seconds for a batch is above the default settings in kafka related
> to heartbeat timeouts, so that might be related.  Have you tried
> tweaking session.timeout.ms, heartbeat.interval.ms, or related
> configs?
>
> On Wed, Oct 19, 2016 at 12:22 PM, Srikanth  wrote:
> > Bringing this thread back as I'm seeing this exception on a production
> kafka
> > cluster.
> >
> > I have two Spark streaming apps reading the same topic. App1 has batch
> > interval 2secs and app2 has 60secs.
> > Both apps are running on the same cluster on similar hardware. I see this
> > exception only in app2 and fairly consistently.
> >
> > Difference I see between the apps is
> > App1
> >   spark.streaming.kafka.maxRatePerPartition, 6000
> >   batch interval 2 secs
> > App2
> >   spark.streaming.kafka.maxRatePerPartition, 1
> >   batch interval 60 secs
> >
> > All other kafka/spark related configs are same for both apps.
> >   spark.streaming.kafka.consumer.poll.ms = 4096
> >   spark.streaming.backpressure.enabled = true
> >
> > Not sure if pre-fetching or caching is messing things up.
> >
> > 16/10/19 14:32:04 WARN TaskSetManager: Lost task 2.0 in stage 1780.0 (TID
> > 12541, ip-10-150-20-200.ec2.internal): java.lang.AssertionError:
> assertion
> > failed: Failed to get records for spark-executor-StreamingEventSplitProd
> > mt_event 6 49091480 after polling for 4096
> > at scala.Predef$.assert(Predef.scala:170)
> > at
> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.
> get(CachedKafkaConsumer.scala:74)
> > at
> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(
> KafkaRDD.scala:227)
> > at
> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(
> KafkaRDD.scala:193)
> > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> > at scala.collection.Iterator$$anon$21.next(Iterator.scala:838)
> >
> >
> > On Wed, Sep 7, 2016 at 3:55 PM, Cody Koeninger 
> wrote:
> >>
> >> That's not what I would have expected to happen with a lower cache
> >> setting, but in general disabling the cache isn't something you want
> >> to do with the new kafka consumer.
> >>
> >>
> >> As far as the original issue, are you seeing those polling errors
> >> intermittently, or consistently?  From your description, it sounds
> >> like retry is working correctly.
> >>
> >>
> >> On Wed, Sep 7, 2016 at 2:37 PM, Srikanth  wrote:
> >> > Setting those two results in below exception.
> >> > No.of executors < no.of partitions. Could that be triggering this?
> >> >
> >> > 16/09/07 15:33:13 ERROR Executor: Exception in task 2.0 in stage 2.0
> >> > (TID 9)
> >> > java.util.ConcurrentModificationException: KafkaConsumer is not safe
> for
> >> > multi-threaded access
> >> > at
> >> >
> >> > org.apache.kafka.clients.consumer.KafkaConsumer.
> acquire(KafkaConsumer.java:1430)
> >> > at
> >> >
> >> > org.apache.kafka.clients.consumer.KafkaConsumer.close(
> KafkaConsumer.java:1360)
> >> > at
> >> >
> >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$$
> anon$1.removeEldestEntry(CachedKafkaConsumer.scala:128)
> >> > at java.util.LinkedHashMap.afterNodeInsertion(Unknown Source)
> >> > at java.util.HashMap.putVal(Unknown Source)
> >> > at java.util.HashMap.put(Unknown Source)
> >> > at
> >> >
> >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.
> get(CachedKafkaConsumer.scala:158)
> >> > at
> >> >
> >> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.(
> KafkaRDD.scala:210)
> >> > at
> >> > org.apache.spark.streaming.kafka010.KafkaRDD.compute(
> KafkaRDD.scala:185)
> >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> >> > at
> >> > org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> >> > at
> >> > org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> >> > at
> >> > org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> >> > at
> >> >
> >> > org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:79)
> >> > at
> >> >
> >> > org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:47)
> >> > at org.apache.spark.scheduler.Task.run(Task.scala:85)
> >> > at 

Predict a single vector with the new spark.ml API to avoid groupByKey() after a flatMap()?

2016-10-20 Thread jglov
Is there a way to predict a single vector with the new spark.ml API, although
in my case it's because I want to do this within a map() to avoid calling
groupByKey() after a flatMap():

*Current code (pyspark):*

% Given 'model', 'rdd', and a function 'split_element' that splits an
element of the RDD into a list of elements (and assuming
% each element has both a value and a key so that groupByKey will work to
merge them later)

split_rdd = rdd.flatMap(split_element)
split_results = model.transform(split_rdd.toDF()).rdd
return split_results.groupByKey()

*Desired code:*

split_rdd = rdd.map(split_element)
split_results = split_rdd.map(lambda elem_list: [model.transformOne(elem)
for elem in elem_list])
return split_results



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Predict-a-single-vector-with-the-new-spark-ml-API-to-avoid-groupByKey-after-a-flatMap-tp27932.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Mlib RandomForest (Spark 2.0) predict a single vector

2016-10-20 Thread jglov
I would also like to know if there is a way to predict a single vector with
the new spark.ml API, although in my case it's because I want to do this
within a map() to avoid calling groupByKey() after a flatMap():

*Current code (pyspark):*

% Given 'model', 'rdd', and a function 'split_element' that splits an
element of the RDD into a list of elements (and assuming
% each element has both a value and a key so that groupByKey will work to
merge them later)

split_rdd = rdd.flatMap(split_element)
split_results = model.transform(split_rdd.toDF()).rdd
return split_results.groupByKey()

*Desired code:*

split_rdd = rdd.map(split_element)
split_results = split_rdd.map(lambda elem_list: [model.transformOne(elem)
for elem in elem_list])
return split_results




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Mlib-RandomForest-Spark-2-0-predict-a-single-vector-tp27447p27931.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



HashingTF for TF.IDF computation

2016-10-20 Thread Ciumac Sergiu
Hello everyone,

I'm having a usage issue with HashingTF class from Spark MLLIB.

I'm computing TF.IDF on a set of terms/documents which later I'm using to
identify most important ones in each of the input document.

Below is a short code snippet which outlines the example (2 documents with
2 words each, executed on Spark 2.0).

val documentsToEvaluate = sc.parallelize(Array(Seq("Mars",
"Jupiter"),Seq("Venus", "Mars")))
val hashingTF = new HashingTF()
val tf = hashingTF.transform(documentsToEvaluate)
tf.cache()
val idf = new IDF().fit(tf)
val tfidf: RDD[Vector] = idf.transform(tf)
documentsToEvaluate.zip(tfidf).saveAsTextFile("/tmp/tfidf")

The computation yields to the following result:

(List(Mars, Jupiter),(1048576,[593437,962819],[0.4054651081081644,0.0]))
(List(Venus, Mars),(1048576,[798918,962819],[0.4054651081081644,0.0]))

My concern is that I can't get a mapping of TF.IDF weights an initial terms
used (i.e. Mars : 0.0, Jupiter : 0.4, Venus : 0.4. You may notice that the
weight and terms indices do not correspond after zipping 2 sequences). I
can only identify the hash (i.e. 593437 : 0.4) mappings.

I know HashingTF uses the hashing trick to compute TF. My question is it
possible to retrieve terms / weights mapping, or HashingTF was not designed
to handle this use-case. If latter, what other implementation of TF.IDF you
may recommend.

I may continue the computation with the (*hash:weight*) tuple, though
getting initial (*term:weight)* would result in a lot easier debugging
steps later down the pipeline.

Any response will be greatly appreciated!

Regards, Sergiu Ciumac


Re: spark pi example fail on yarn

2016-10-20 Thread Amit Tank
I recently started learning spark so I may be completely wrong here but I
was facing similar problem with sparkpi on yarn. After changing yarn to
cluster mode it worked perfectly fine.

Thank you,
Amit

On Thursday, October 20, 2016, Saisai Shao  wrote:

> Looks like ApplicationMaster is killed by SIGTERM.
>
> 16/10/20 18:12:04 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL TERM
> 16/10/20 18:12:04 INFO yarn.ApplicationMaster: Final app status:
>
> This container may be killed by yarn NodeManager or other processes, you'd
> better check yarn log to dig out more details.
>
> Thanks
> Saisai
>
> On Thu, Oct 20, 2016 at 6:51 PM, Li Li  > wrote:
>
>> I am setting up a small yarn/spark cluster. hadoop/yarn version is
>> 2.7.3 and I can run wordcount map-reduce correctly in yarn.
>> And I am using  spark-2.0.1-bin-hadoop2.7 using command:
>> ~/spark-2.0.1-bin-hadoop2.7$ ./bin/spark-submit --class
>> org.apache.spark.examples.SparkPi --master yarn-client
>> examples/jars/spark-examples_2.11-2.0.1.jar 1
>> it fails and the first error is:
>> 16/10/20 18:12:03 INFO storage.BlockManagerMaster: Registered
>> BlockManager BlockManagerId(driver, 10.161.219.189, 39161)
>> 16/10/20 18:12:03 INFO handler.ContextHandler: Started
>> o.s.j.s.ServletContextHandler@76ad6715{/metrics/json,null,AVAILABLE}
>> 16/10/20 18:12:12 INFO
>> cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster
>> registered as NettyRpcEndpointRef(null)
>> 16/10/20 18:12:12 INFO cluster.YarnClientSchedulerBackend: Add WebUI
>> Filter. org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter,
>> Map(PROXY_HOSTS -> ai-hz1-spark1, PROXY_URI_BASES ->
>> http://ai-hz1-spark1:8088/proxy/application_1476957324184_0002),
>> /proxy/application_1476957324184_0002
>> 16/10/20 18:12:12 INFO ui.JettyUtils: Adding filter:
>> org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
>> 16/10/20 18:12:12 INFO cluster.YarnClientSchedulerBackend:
>> SchedulerBackend is ready for scheduling beginning after waiting
>> maxRegisteredResourcesWaitingTime: 3(ms)
>> 16/10/20 18:12:12 WARN spark.SparkContext: Use an existing
>> SparkContext, some configuration may not take effect.
>> 16/10/20 18:12:12 INFO handler.ContextHandler: Started
>> o.s.j.s.ServletContextHandler@489091bd{/SQL,null,AVAILABLE}
>> 16/10/20 18:12:12 INFO handler.ContextHandler: Started
>> o.s.j.s.ServletContextHandler@1de9b505{/SQL/json,null,AVAILABLE}
>> 16/10/20 18:12:12 INFO handler.ContextHandler: Started
>> o.s.j.s.ServletContextHandler@378f002a{/SQL/execution,null,AVAILABLE}
>> 16/10/20 18:12:12 INFO handler.ContextHandler: Started
>> o.s.j.s.ServletContextHandler@2cc75074{/SQL/execution/json,n
>> ull,AVAILABLE}
>> 16/10/20 18:12:12 INFO handler.ContextHandler: Started
>> o.s.j.s.ServletContextHandler@2d64160c{/static/sql,null,AVAILABLE}
>> 16/10/20 18:12:12 INFO internal.SharedState: Warehouse path is
>> '/home/hadoop/spark-2.0.1-bin-hadoop2.7/spark-warehouse'.
>> 16/10/20 18:12:13 INFO spark.SparkContext: Starting job: reduce at
>> SparkPi.scala:38
>> 16/10/20 18:12:13 INFO scheduler.DAGScheduler: Got job 0 (reduce at
>> SparkPi.scala:38) with 1 output partitions
>> 16/10/20 18:12:13 INFO scheduler.DAGScheduler: Final stage:
>> ResultStage 0 (reduce at SparkPi.scala:38)
>> 16/10/20 18:12:13 INFO scheduler.DAGScheduler: Parents of final stage:
>> List()
>> 16/10/20 18:12:13 INFO scheduler.DAGScheduler: Missing parents: List()
>> 16/10/20 18:12:13 INFO scheduler.DAGScheduler: Submitting ResultStage
>> 0 (MapPartitionsRDD[1] at map at SparkPi.scala:34), which has no
>> missing parents
>> 16/10/20 18:12:13 INFO memory.MemoryStore: Block broadcast_0 stored as
>> values in memory (estimated size 1832.0 B, free 366.3 MB)
>> 16/10/20 18:12:13 INFO memory.MemoryStore: Block broadcast_0_piece0
>> stored as bytes in memory (estimated size 1169.0 B, free 366.3 MB)
>> 16/10/20 18:12:13 INFO storage.BlockManagerInfo: Added
>> broadcast_0_piece0 in memory on 10.161.219.189:39161 (size: 1169.0 B,
>> free: 366.3 MB)
>> 16/10/20 18:12:13 INFO spark.SparkContext: Created broadcast 0 from
>> broadcast at DAGScheduler.scala:1012
>> 16/10/20 18:12:13 INFO scheduler.DAGScheduler: Submitting 1
>> missing tasks from ResultStage 0 (MapPartitionsRDD[1] at map at
>> SparkPi.scala:34)
>> 16/10/20 18:12:13 INFO cluster.YarnScheduler: Adding task set 0.0 with
>> 1 tasks
>> 16/10/20 18:12:14 ERROR cluster.YarnClientSchedulerBackend: Yarn
>> application has already exited with state FINISHED!
>> 16/10/20 18:12:14 INFO server.ServerConnector: Stopped
>> ServerConnector@389adf1d{HTTP/1.1}{0.0.0.0:4040}
>> 16/10/20 18:12:14 INFO handler.ContextHandler: Stopped
>> o.s.j.s.ServletContextHandler@841e575{/stages/stage/kill,nul
>> l,UNAVAILABLE}
>> 16/10/20 18:12:14 INFO handler.ContextHandler: Stopped
>> o.s.j.s.ServletContextHandler@66629f63{/api,null,UNAVAILABLE}
>> 16/10/20 18:12:14 INFO 

Re: Microbatches length

2016-10-20 Thread Paulo Candido
Thanks a lot.
I'll check it.
Regards.

Em qui, 20 de out de 2016 às 10:50, vincent gromakowski <
vincent.gromakow...@gmail.com> escreveu:

You can still implement your own logic with akka actors for instance. Based
on some threshold the actor can launch spark batch mode using the same
spark context... It's only an idea , no real experience.

Le 20 oct. 2016 1:31 PM, "Paulo Candido"  a écrit :

In this case I haven't any alternatives to get microbatches with same
length? Using another class or any configuration? I'm using socket.

Thank you for attention.

Em qui, 20 de out de 2016 às 09:24, 王贺(Gabriel) 
escreveu:

The interval is for time, so you won't get micro-batches in same data size
but same time length.

Yours sincerely,
Gabriel (王贺)
Mobile: +86 18621263813 <+86%20186%202126%203813>


On Thu, Oct 20, 2016 at 6:38 PM, pcandido  wrote:

Hello folks,

I'm using Spark Streaming. My question is simple:
The documentation says that microbatches arrive in intervals. The intervals
are in real time (minutes, seconds). I want to get microbatches with same
length, so, I can configure SS to return microbatches when it reach a
determined length?

Thanks.



--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Microbatches-length-tp27927.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


-- 

Paulo Cândido

-- 

Paulo Cândido


Re: spark pi example fail on yarn

2016-10-20 Thread Elek, Marton

Try to set the memory size limits. For example:

./bin/spark-submit --class org.apache.spark.examples.SparkPi --master 
yarn --deploy-mode cluster --driver-memory 4g 
--executor-memory 2g --executor-cores 1 
./examples/jars/spark-examples_2.11-2.0.0.2.5.2.0-47.jar


By default yarn prefers to kill containers not only by physical but 
virtual memory limit.


You could also try to set

yarn.nodemanager.vmem-check-enabled

to false (yarn-site.xml)

Regards
Marton


On 10/20/16 4:02 PM, Saisai Shao wrote:

Looks like ApplicationMaster is killed by SIGTERM.

16/10/20 18:12:04 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL TERM
16/10/20 18:12:04 INFO yarn.ApplicationMaster: Final app status:

This container may be killed by yarn NodeManager or other processes,
you'd better check yarn log to dig out more details.

Thanks
Saisai

On Thu, Oct 20, 2016 at 6:51 PM, Li Li > wrote:

I am setting up a small yarn/spark cluster. hadoop/yarn version is
2.7.3 and I can run wordcount map-reduce correctly in yarn.
And I am using  spark-2.0.1-bin-hadoop2.7 using command:
~/spark-2.0.1-bin-hadoop2.7$ ./bin/spark-submit --class
org.apache.spark.examples.SparkPi --master yarn-client
examples/jars/spark-examples_2.11-2.0.1.jar 1
it fails and the first error is:
16/10/20 18:12:03 INFO storage.BlockManagerMaster: Registered
BlockManager BlockManagerId(driver, 10.161.219.189, 39161)
16/10/20 18:12:03 INFO handler.ContextHandler: Started
o.s.j.s.ServletContextHandler@76ad6715{/metrics/json,null,AVAILABLE}
16/10/20 18:12:12 INFO
cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster
registered as NettyRpcEndpointRef(null)
16/10/20 18:12:12 INFO cluster.YarnClientSchedulerBackend: Add WebUI
Filter. org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter,
Map(PROXY_HOSTS -> ai-hz1-spark1, PROXY_URI_BASES ->
http://ai-hz1-spark1:8088/proxy/application_1476957324184_0002
),
/proxy/application_1476957324184_0002
16/10/20 18:12:12 INFO ui.JettyUtils: Adding filter:
org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
16/10/20 18:12:12 INFO cluster.YarnClientSchedulerBackend:
SchedulerBackend is ready for scheduling beginning after waiting
maxRegisteredResourcesWaitingTime: 3(ms)
16/10/20 18:12:12 WARN spark.SparkContext: Use an existing
SparkContext, some configuration may not take effect.
16/10/20 18:12:12 INFO handler.ContextHandler: Started
o.s.j.s.ServletContextHandler@489091bd{/SQL,null,AVAILABLE}
16/10/20 18:12:12 INFO handler.ContextHandler: Started
o.s.j.s.ServletContextHandler@1de9b505{/SQL/json,null,AVAILABLE}
16/10/20 18:12:12 INFO handler.ContextHandler: Started
o.s.j.s.ServletContextHandler@378f002a{/SQL/execution,null,AVAILABLE}
16/10/20 18:12:12 INFO handler.ContextHandler: Started
o.s.j.s.ServletContextHandler@2cc75074{/SQL/execution/json,null,AVAILABLE}
16/10/20 18:12:12 INFO handler.ContextHandler: Started
o.s.j.s.ServletContextHandler@2d64160c{/static/sql,null,AVAILABLE}
16/10/20 18:12:12 INFO internal.SharedState: Warehouse path is
'/home/hadoop/spark-2.0.1-bin-hadoop2.7/spark-warehouse'.
16/10/20 18:12:13 INFO spark.SparkContext: Starting job: reduce at
SparkPi.scala:38
16/10/20 18:12:13 INFO scheduler.DAGScheduler: Got job 0 (reduce at
SparkPi.scala:38) with 1 output partitions
16/10/20 18:12:13 INFO scheduler.DAGScheduler: Final stage:
ResultStage 0 (reduce at SparkPi.scala:38)
16/10/20 18:12:13 INFO scheduler.DAGScheduler: Parents of final
stage: List()
16/10/20 18:12:13 INFO scheduler.DAGScheduler: Missing parents: List()
16/10/20 18:12:13 INFO scheduler.DAGScheduler: Submitting ResultStage
0 (MapPartitionsRDD[1] at map at SparkPi.scala:34), which has no
missing parents
16/10/20 18:12:13 INFO memory.MemoryStore: Block broadcast_0 stored as
values in memory (estimated size 1832.0 B, free 366.3 MB)
16/10/20 18:12:13 INFO memory.MemoryStore: Block broadcast_0_piece0
stored as bytes in memory (estimated size 1169.0 B, free 366.3 MB)
16/10/20 18:12:13 INFO storage.BlockManagerInfo: Added
broadcast_0_piece0 in memory on 10.161.219.189:39161
 (size: 1169.0 B,
free: 366.3 MB)
16/10/20 18:12:13 INFO spark.SparkContext: Created broadcast 0 from
broadcast at DAGScheduler.scala:1012
16/10/20 18:12:13 INFO scheduler.DAGScheduler: Submitting 1
missing tasks from ResultStage 0 (MapPartitionsRDD[1] at map at
SparkPi.scala:34)
16/10/20 18:12:13 INFO cluster.YarnScheduler: Adding task set 0.0 with
1 tasks
16/10/20 18:12:14 ERROR cluster.YarnClientSchedulerBackend: Yarn
application has already exited with state FINISHED!
16/10/20 18:12:14 INFO server.ServerConnector: 

Re: spark pi example fail on yarn

2016-10-20 Thread Saisai Shao
Looks like ApplicationMaster is killed by SIGTERM.

16/10/20 18:12:04 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL TERM
16/10/20 18:12:04 INFO yarn.ApplicationMaster: Final app status:

This container may be killed by yarn NodeManager or other processes, you'd
better check yarn log to dig out more details.

Thanks
Saisai

On Thu, Oct 20, 2016 at 6:51 PM, Li Li  wrote:

> I am setting up a small yarn/spark cluster. hadoop/yarn version is
> 2.7.3 and I can run wordcount map-reduce correctly in yarn.
> And I am using  spark-2.0.1-bin-hadoop2.7 using command:
> ~/spark-2.0.1-bin-hadoop2.7$ ./bin/spark-submit --class
> org.apache.spark.examples.SparkPi --master yarn-client
> examples/jars/spark-examples_2.11-2.0.1.jar 1
> it fails and the first error is:
> 16/10/20 18:12:03 INFO storage.BlockManagerMaster: Registered
> BlockManager BlockManagerId(driver, 10.161.219.189, 39161)
> 16/10/20 18:12:03 INFO handler.ContextHandler: Started
> o.s.j.s.ServletContextHandler@76ad6715{/metrics/json,null,AVAILABLE}
> 16/10/20 18:12:12 INFO
> cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster
> registered as NettyRpcEndpointRef(null)
> 16/10/20 18:12:12 INFO cluster.YarnClientSchedulerBackend: Add WebUI
> Filter. org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter,
> Map(PROXY_HOSTS -> ai-hz1-spark1, PROXY_URI_BASES ->
> http://ai-hz1-spark1:8088/proxy/application_1476957324184_0002),
> /proxy/application_1476957324184_0002
> 16/10/20 18:12:12 INFO ui.JettyUtils: Adding filter:
> org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
> 16/10/20 18:12:12 INFO cluster.YarnClientSchedulerBackend:
> SchedulerBackend is ready for scheduling beginning after waiting
> maxRegisteredResourcesWaitingTime: 3(ms)
> 16/10/20 18:12:12 WARN spark.SparkContext: Use an existing
> SparkContext, some configuration may not take effect.
> 16/10/20 18:12:12 INFO handler.ContextHandler: Started
> o.s.j.s.ServletContextHandler@489091bd{/SQL,null,AVAILABLE}
> 16/10/20 18:12:12 INFO handler.ContextHandler: Started
> o.s.j.s.ServletContextHandler@1de9b505{/SQL/json,null,AVAILABLE}
> 16/10/20 18:12:12 INFO handler.ContextHandler: Started
> o.s.j.s.ServletContextHandler@378f002a{/SQL/execution,null,AVAILABLE}
> 16/10/20 18:12:12 INFO handler.ContextHandler: Started
> o.s.j.s.ServletContextHandler@2cc75074{/SQL/execution/json,null,AVAILABLE}
> 16/10/20 18:12:12 INFO handler.ContextHandler: Started
> o.s.j.s.ServletContextHandler@2d64160c{/static/sql,null,AVAILABLE}
> 16/10/20 18:12:12 INFO internal.SharedState: Warehouse path is
> '/home/hadoop/spark-2.0.1-bin-hadoop2.7/spark-warehouse'.
> 16/10/20 18:12:13 INFO spark.SparkContext: Starting job: reduce at
> SparkPi.scala:38
> 16/10/20 18:12:13 INFO scheduler.DAGScheduler: Got job 0 (reduce at
> SparkPi.scala:38) with 1 output partitions
> 16/10/20 18:12:13 INFO scheduler.DAGScheduler: Final stage:
> ResultStage 0 (reduce at SparkPi.scala:38)
> 16/10/20 18:12:13 INFO scheduler.DAGScheduler: Parents of final stage:
> List()
> 16/10/20 18:12:13 INFO scheduler.DAGScheduler: Missing parents: List()
> 16/10/20 18:12:13 INFO scheduler.DAGScheduler: Submitting ResultStage
> 0 (MapPartitionsRDD[1] at map at SparkPi.scala:34), which has no
> missing parents
> 16/10/20 18:12:13 INFO memory.MemoryStore: Block broadcast_0 stored as
> values in memory (estimated size 1832.0 B, free 366.3 MB)
> 16/10/20 18:12:13 INFO memory.MemoryStore: Block broadcast_0_piece0
> stored as bytes in memory (estimated size 1169.0 B, free 366.3 MB)
> 16/10/20 18:12:13 INFO storage.BlockManagerInfo: Added
> broadcast_0_piece0 in memory on 10.161.219.189:39161 (size: 1169.0 B,
> free: 366.3 MB)
> 16/10/20 18:12:13 INFO spark.SparkContext: Created broadcast 0 from
> broadcast at DAGScheduler.scala:1012
> 16/10/20 18:12:13 INFO scheduler.DAGScheduler: Submitting 1
> missing tasks from ResultStage 0 (MapPartitionsRDD[1] at map at
> SparkPi.scala:34)
> 16/10/20 18:12:13 INFO cluster.YarnScheduler: Adding task set 0.0 with
> 1 tasks
> 16/10/20 18:12:14 ERROR cluster.YarnClientSchedulerBackend: Yarn
> application has already exited with state FINISHED!
> 16/10/20 18:12:14 INFO server.ServerConnector: Stopped
> ServerConnector@389adf1d{HTTP/1.1}{0.0.0.0:4040}
> 16/10/20 18:12:14 INFO handler.ContextHandler: Stopped
> o.s.j.s.ServletContextHandler@841e575{/stages/stage/kill,null,UNAVAILABLE}
> 16/10/20 18:12:14 INFO handler.ContextHandler: Stopped
> o.s.j.s.ServletContextHandler@66629f63{/api,null,UNAVAILABLE}
> 16/10/20 18:12:14 INFO handler.ContextHandler: Stopped
> o.s.j.s.ServletContextHandler@2b62442c{/,null,UNAVAILABLE}
>
>
> I also use yarn log to get logs from yarn(total log is very lengthy in
> attachement):
> 16/10/20 18:12:03 INFO yarn.ExecutorRunnable:
> 
> ===
> YARN executor launch context:
>   env:
> CLASSPATH ->
> {{PWD}}{{PWD}}/__spark_conf__{{PWD}}/__spark_
> 

Re: Joins of typed datasets

2016-10-20 Thread daunnc
A situation changes a bit, and the workaround is to add `K` restriction (K
should be a subtype of Product); 

Thought I have right now another error: 

org.apache.spark.sql.AnalysisException: cannot resolve '(`key` = `key`)' due
to data type mismatch: differing types in '(`key` = `key`)'
(struct and struct).;
[info]   at
org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
[info]   at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:82)
[info]   at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:74)
[info]   at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:301)
[info]   at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:301)
[info]   at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
[info]   at
org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:300)
[info]   at
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionUp$1(QueryPlan.scala:191)
[info]   at
org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:202)
[info]   at
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$5.apply(QueryPlan.scala:210)

Mb it is cased by the fact that a certain Key extends Product2? 

case class SpatialKey(col: Int, row: Int) extends Product2[Int, Int] // with
join by this key would throw err
case class SpaceTimeKey(col: Int, row: Int, instant: Long) // with join by
this key would be no errs



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Joins-of-typed-datasets-tp27924p27929.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Microbatches length

2016-10-20 Thread vincent gromakowski
You can still implement your own logic with akka actors for instance. Based
on some threshold the actor can launch spark batch mode using the same
spark context... It's only an idea , no real experience.

Le 20 oct. 2016 1:31 PM, "Paulo Candido"  a écrit :

> In this case I haven't any alternatives to get microbatches with same
> length? Using another class or any configuration? I'm using socket.
>
> Thank you for attention.
>
> Em qui, 20 de out de 2016 às 09:24, 王贺(Gabriel) 
> escreveu:
>
>> The interval is for time, so you won't get micro-batches in same data
>> size but same time length.
>>
>> Yours sincerely,
>> Gabriel (王贺)
>> Mobile: +86 18621263813 <+86%20186%202126%203813>
>>
>>
>> On Thu, Oct 20, 2016 at 6:38 PM, pcandido  wrote:
>>
>> Hello folks,
>>
>> I'm using Spark Streaming. My question is simple:
>> The documentation says that microbatches arrive in intervals. The
>> intervals
>> are in real time (minutes, seconds). I want to get microbatches with same
>> length, so, I can configure SS to return microbatches when it reach a
>> determined length?
>>
>> Thanks.
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/Microbatches-length-tp27927.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>> --
>
> Paulo Cândido
>


Re: Spark Random Forest training cost same time on yarn as on standalone

2016-10-20 Thread Xi Shen
If you are running on your local, I do not see the point that you start
with 32 executors with 2 cores for each.

Also, you can check the Spark web console to find out where the time spent.

Also, you may want to read
http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/
.


On Thu, Oct 20, 2016 at 6:21 PM 陈哲  wrote:

> I'm training random forest model using spark2.0 on yarn with cmd like:
> $SPARK_HOME/bin/spark-submit \
>   --class com.netease.risk.prediction.HelpMain --master yarn
> --deploy-mode client --driver-cores 1 --num-executors 32 --executor-cores 2 
> --driver-memory
> 10g --executor-memory 6g \
>   --conf spark.rpc.askTimeout=3000 --conf spark.rpc.lookupTimeout=3000
> --conf spark.rpc.message.maxSize=2000  --conf spark.driver.maxResultSize=0
> \
> 
>
> the training process cost almost 8 hours
>
> And I tried training model on local machine with master(local[4]) , the
> whole process still cost 8 - 9 hours.
>
> My question is why running on yarn doesn't save time ? is this suppose to
> be distributed, with 32 executors ? And am I missing anything or what I can
> do to improve this and save more time ?
>
> Thanks
>
> --


Thanks,
David S.


Re: Ensuring an Avro File is NOT Splitable

2016-10-20 Thread Jörn Franke
What is the use case of this? You will reduce performance significantly.
Nevertheless, the way you propose is the way to go, but I do not recommend it.

> On 20 Oct 2016, at 14:00, Ashan Taha  wrote:
> 
> Hi
>  
> What’s the best way to make sure an Avro file is NOT Splitable when read in 
> Spark?
> Would you override the AvroKeyInputFormat.issplitable (to return false) and 
> then call this using newAPIHadoopRDD? Or is there a better way using the 
> sqlContext.read?
>  
> Thanks in advance


Ensuring an Avro File is NOT Splitable

2016-10-20 Thread Ashan Taha
Hi

What's the best way to make sure an Avro file is NOT Splitable when read in 
Spark?
Would you override the AvroKeyInputFormat.issplitable (to return false) and 
then call this using newAPIHadoopRDD? Or is there a better way using the 
sqlContext.read?

Thanks in advance


Re: Microbatches length

2016-10-20 Thread Paulo Candido
In this case I haven't any alternatives to get microbatches with same
length? Using another class or any configuration? I'm using socket.

Thank you for attention.

Em qui, 20 de out de 2016 às 09:24, 王贺(Gabriel) 
escreveu:

> The interval is for time, so you won't get micro-batches in same data size
> but same time length.
>
> Yours sincerely,
> Gabriel (王贺)
> Mobile: +86 18621263813 <+86%20186%202126%203813>
>
>
> On Thu, Oct 20, 2016 at 6:38 PM, pcandido  wrote:
>
> Hello folks,
>
> I'm using Spark Streaming. My question is simple:
> The documentation says that microbatches arrive in intervals. The intervals
> are in real time (minutes, seconds). I want to get microbatches with same
> length, so, I can configure SS to return microbatches when it reach a
> determined length?
>
> Thanks.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Microbatches-length-tp27927.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
> --

Paulo Cândido


Re: Microbatches length

2016-10-20 Thread Gabriel
The interval is for time, so you won't get micro-batches in same data size
but same time length.

Yours sincerely,
Gabriel (王贺)
Mobile: +86 18621263813


On Thu, Oct 20, 2016 at 6:38 PM, pcandido  wrote:

> Hello folks,
>
> I'm using Spark Streaming. My question is simple:
> The documentation says that microbatches arrive in intervals. The intervals
> are in real time (minutes, seconds). I want to get microbatches with same
> length, so, I can configure SS to return microbatches when it reach a
> determined length?
>
> Thanks.
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Microbatches-length-tp27927.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Expression Encoder for Map[Int, String] in a custom Aggregator on a Dataset

2016-10-20 Thread Anton Okolnychyi
Hi all,

I am trying to use my custom Aggregator on a GroupedDataset of case classes
to create a hash map using Spark SQL 1.6.2.
My Encoder[Map[Int, String]] is not capable to reconstruct the reduced
values if I define it via ExpressionEncoder().
However, everything works fine if I define it as Encoders.kryo[Map[Int,
String]].
I would like to know if I am doing anything wrong.

I have the following use case:

  implicit val intStringMapEncoder: Encoder[Map[Int, String]] =
ExpressionEncoder()

  val sparkContext = ...
  val sparkSqlContext = new SQLContext(sparkContext)

  import sparkSqlContext.implicits._

  case class StopPoint(line: String, sequenceNumber: Int, id: String)

  val stopPointDS = Seq(StopPoint("33", 1, "1"), StopPoint("33", 2,
"2")).toDS()

  val stopPointSequenceMap = new Aggregator[StopPoint, Map[Int, String],
Map[Int, String]] {
override def zero = Map[Int, String]()
override def reduce(map: Map[Int, String], stopPoint: StopPoint) = {
  map.updated(stopPoint.sequenceNumber, stopPoint.id)
}
override def merge(map: Map[Int, String], anotherMap: Map[Int, String])
= {
  map ++ anotherMap
}
override def finish(reduction: Map[Int, String]) = reduction
  }.toColumn

  val resultMap = stopPointDS
.groupBy(_.line)
.agg(stopPointSequenceMap)
.collect()
.toMap
In spark.sql.execution.TypedAggregateExpression.scala, I see that each
entry is inserted into the initial map correctly (i.e. reduce() method
works properly).
However, my encoder cannot reconstruct the map from the reduce phase in the
merge phase and I get an empty Map as a result of the merge method.
If I replace my expression-based encoder with
org.apache.spark.sql.Encoders.kryo[Map[Int, String]], I will get the
correct result.
(33, Map(1 -> 1, 2 -> 2))

Any ideas/suggestions are more than welcome.

Sincerely,
Anton Okolnychyi


spark pi example fail on yarn

2016-10-20 Thread Li Li
I am setting up a small yarn/spark cluster. hadoop/yarn version is
2.7.3 and I can run wordcount map-reduce correctly in yarn.
And I am using  spark-2.0.1-bin-hadoop2.7 using command:
~/spark-2.0.1-bin-hadoop2.7$ ./bin/spark-submit --class
org.apache.spark.examples.SparkPi --master yarn-client
examples/jars/spark-examples_2.11-2.0.1.jar 1
it fails and the first error is:
16/10/20 18:12:03 INFO storage.BlockManagerMaster: Registered
BlockManager BlockManagerId(driver, 10.161.219.189, 39161)
16/10/20 18:12:03 INFO handler.ContextHandler: Started
o.s.j.s.ServletContextHandler@76ad6715{/metrics/json,null,AVAILABLE}
16/10/20 18:12:12 INFO
cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster
registered as NettyRpcEndpointRef(null)
16/10/20 18:12:12 INFO cluster.YarnClientSchedulerBackend: Add WebUI
Filter. org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter,
Map(PROXY_HOSTS -> ai-hz1-spark1, PROXY_URI_BASES ->
http://ai-hz1-spark1:8088/proxy/application_1476957324184_0002),
/proxy/application_1476957324184_0002
16/10/20 18:12:12 INFO ui.JettyUtils: Adding filter:
org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
16/10/20 18:12:12 INFO cluster.YarnClientSchedulerBackend:
SchedulerBackend is ready for scheduling beginning after waiting
maxRegisteredResourcesWaitingTime: 3(ms)
16/10/20 18:12:12 WARN spark.SparkContext: Use an existing
SparkContext, some configuration may not take effect.
16/10/20 18:12:12 INFO handler.ContextHandler: Started
o.s.j.s.ServletContextHandler@489091bd{/SQL,null,AVAILABLE}
16/10/20 18:12:12 INFO handler.ContextHandler: Started
o.s.j.s.ServletContextHandler@1de9b505{/SQL/json,null,AVAILABLE}
16/10/20 18:12:12 INFO handler.ContextHandler: Started
o.s.j.s.ServletContextHandler@378f002a{/SQL/execution,null,AVAILABLE}
16/10/20 18:12:12 INFO handler.ContextHandler: Started
o.s.j.s.ServletContextHandler@2cc75074{/SQL/execution/json,null,AVAILABLE}
16/10/20 18:12:12 INFO handler.ContextHandler: Started
o.s.j.s.ServletContextHandler@2d64160c{/static/sql,null,AVAILABLE}
16/10/20 18:12:12 INFO internal.SharedState: Warehouse path is
'/home/hadoop/spark-2.0.1-bin-hadoop2.7/spark-warehouse'.
16/10/20 18:12:13 INFO spark.SparkContext: Starting job: reduce at
SparkPi.scala:38
16/10/20 18:12:13 INFO scheduler.DAGScheduler: Got job 0 (reduce at
SparkPi.scala:38) with 1 output partitions
16/10/20 18:12:13 INFO scheduler.DAGScheduler: Final stage:
ResultStage 0 (reduce at SparkPi.scala:38)
16/10/20 18:12:13 INFO scheduler.DAGScheduler: Parents of final stage: List()
16/10/20 18:12:13 INFO scheduler.DAGScheduler: Missing parents: List()
16/10/20 18:12:13 INFO scheduler.DAGScheduler: Submitting ResultStage
0 (MapPartitionsRDD[1] at map at SparkPi.scala:34), which has no
missing parents
16/10/20 18:12:13 INFO memory.MemoryStore: Block broadcast_0 stored as
values in memory (estimated size 1832.0 B, free 366.3 MB)
16/10/20 18:12:13 INFO memory.MemoryStore: Block broadcast_0_piece0
stored as bytes in memory (estimated size 1169.0 B, free 366.3 MB)
16/10/20 18:12:13 INFO storage.BlockManagerInfo: Added
broadcast_0_piece0 in memory on 10.161.219.189:39161 (size: 1169.0 B,
free: 366.3 MB)
16/10/20 18:12:13 INFO spark.SparkContext: Created broadcast 0 from
broadcast at DAGScheduler.scala:1012
16/10/20 18:12:13 INFO scheduler.DAGScheduler: Submitting 1
missing tasks from ResultStage 0 (MapPartitionsRDD[1] at map at
SparkPi.scala:34)
16/10/20 18:12:13 INFO cluster.YarnScheduler: Adding task set 0.0 with
1 tasks
16/10/20 18:12:14 ERROR cluster.YarnClientSchedulerBackend: Yarn
application has already exited with state FINISHED!
16/10/20 18:12:14 INFO server.ServerConnector: Stopped
ServerConnector@389adf1d{HTTP/1.1}{0.0.0.0:4040}
16/10/20 18:12:14 INFO handler.ContextHandler: Stopped
o.s.j.s.ServletContextHandler@841e575{/stages/stage/kill,null,UNAVAILABLE}
16/10/20 18:12:14 INFO handler.ContextHandler: Stopped
o.s.j.s.ServletContextHandler@66629f63{/api,null,UNAVAILABLE}
16/10/20 18:12:14 INFO handler.ContextHandler: Stopped
o.s.j.s.ServletContextHandler@2b62442c{/,null,UNAVAILABLE}


I also use yarn log to get logs from yarn(total log is very lengthy in
attachement):
16/10/20 18:12:03 INFO yarn.ExecutorRunnable:
===
YARN executor launch context:
  env:
CLASSPATH ->
{{PWD}}{{PWD}}/__spark_conf__{{PWD}}/__spark_libs__/*$HADOOP_CONF_DIR$HADOOP_COMMON_HOME/share/hadoop/common/*$HADOOP_COMMON_HOME/share/hadoop/common/lib/*$HADOOP_HDFS_HOME/share/hadoop/hdfs/*$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*$HADOOP_YARN_HOME/share/hadoop/yarn/*$HADOOP_YARN_HOME/share/hadoop/yarn/lib/*$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*
SPARK_LOG_URL_STDERR ->
http://ai-hz1-spark3:8042/node/containerlogs/container_1476957324184_0002_01_03/hadoop/stderr?start=-4096
SPARK_YARN_STAGING_DIR ->

Microbatches length

2016-10-20 Thread pcandido
Hello folks,

I'm using Spark Streaming. My question is simple:
The documentation says that microbatches arrive in intervals. The intervals
are in real time (minutes, seconds). I want to get microbatches with same
length, so, I can configure SS to return microbatches when it reach a
determined length?

Thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Microbatches-length-tp27927.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Why the json file used by sparkSession.read.json must be a valid json object per line

2016-10-20 Thread Steve Loughran

> On 19 Oct 2016, at 21:46, Jakob Odersky  wrote:
> 
> Another reason I could imagine is that files are often read from HDFS,
> which by default uses line terminators to separate records.
> 
> It is possible to implement your own hdfs delimiter finder, however
> for arbitrary json data, finding that delimiter would require stateful
> parsing of the file and would be difficult to parallelize across a
> cluster.
> 


good point. 

If you are creating your own files of a list of JSON files, then you could do 
your own encoding, one with say a header for each record (say 'J'+'S'+'O'+'N' + 
int64 length, and split on that: you don't need to scan a record to know its 
length, and you can scan a large document counting its records simply though a 
sequence of skip + read(byte[8]) operations.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark Random Forest training cost same time on yarn as on standalone

2016-10-20 Thread 陈哲
I'm training random forest model using spark2.0 on yarn with cmd like:
$SPARK_HOME/bin/spark-submit \
  --class com.netease.risk.prediction.HelpMain --master yarn --deploy-mode
client --driver-cores 1 --num-executors 32 --executor-cores 2 --driver-memory
10g --executor-memory 6g \
  --conf spark.rpc.askTimeout=3000 --conf spark.rpc.lookupTimeout=3000
--conf spark.rpc.message.maxSize=2000  --conf spark.driver.maxResultSize=0
\


the training process cost almost 8 hours

And I tried training model on local machine with master(local[4]) , the
whole process still cost 8 - 9 hours.

My question is why running on yarn doesn't save time ? is this suppose to
be distributed, with 32 executors ? And am I missing anything or what I can
do to improve this and save more time ?

Thanks


Re: Can i display message on console when use spark on yarn?

2016-10-20 Thread ayan guha
What do you exactly mean by Yarn Console? We use spark-submit and it
generates exactly same log as you mentioned on driver console,

On Thu, Oct 20, 2016 at 8:21 PM, Jone Zhang  wrote:

> I submit spark with "spark-submit --master yarn-cluster --deploy-mode
> cluster"
> How can i display message on yarn console.
> I expect it to be like this:
>
> .
> 16/10/20 17:12:53 main INFO org.apache.spark.deploy.yarn.Client>SPK>
> Application report for application_1453970859007_481440 (state: RUNNING)
> 16/10/20 17:12:58 main INFO org.apache.spark.deploy.yarn.Client>SPK>
> Application report for application_1453970859007_481440 (state: RUNNING)
> 16/10/20 17:13:03 main INFO org.apache.spark.deploy.yarn.Client>SPK>
> Application report for application_1453970859007_481440 (state: RUNNING)
> 16/10/20 17:13:08 main INFO org.apache.spark.deploy.yarn.Client>SPK>
> Application report for application_1453970859007_481440 (state: RUNNING)
> 16/10/20 17:13:13 main INFO org.apache.spark.deploy.yarn.Client>SPK>
> Application report for application_1453970859007_481440 (state: FINISHED)
> 16/10/20 17:13:13 main INFO org.apache.spark.deploy.yarn.Client>SPK>
>  client token: N/A
>  diagnostics: N/A
>  ApplicationMaster host: 10.51.215.100
>  ApplicationMaster RPC port: 0
>  queue: root.default
>  start time: 1476954698645
>  final status: SUCCEEDED
>  tracking URL: http://10.179.20.47:8080/proxy/application_
> 1453970859007_481440/history/application_1453970859007_481440/1
>  user: mqq
> ===Spark Task Result is ===
> ===some message want to display===
> 16/10/20 17:13:13 Thread-3 INFO org.apache.spark.util.ShutdownHookManager>SPK>
> Shutdown hook called
> 16/10/20 17:13:13 Thread-3 INFO org.apache.spark.util.ShutdownHookManager>SPK>
> Deleting directory /data/home/spark_tmp/spark-5b9f434b-5837-46e6-9625-
> c4656b86af9e
>
> Thanks.
>
>
>


-- 
Best Regards,
Ayan Guha


Re: pyspark dataframe codes for lead lag to column

2016-10-20 Thread ayan guha
Yes there are similar functions available, depending on your spark version
look up Pyspark SQL Function module documentation. I also prefer to use SQL
directly within pyspark.

On Thu, Oct 20, 2016 at 8:18 PM, Mendelson, Assaf 
wrote:

> Depending on your usecase, you may want to take a look at window functions
>
>
>
> *From:* muhammet pakyürek [mailto:mpa...@hotmail.com]
> *Sent:* Thursday, October 20, 2016 11:36 AM
> *To:* user@spark.apache.org
> *Subject:* pyspark dataframe codes for lead lag to column
>
>
>
>
>
>
>
> is there pyspark dataframe codes for lead lag to column?
>
>
>
> lead/lag column is something
>
>
>
> 1  lag   -1lead 2
>
> 213
>
> 324
>
> 435
>
> 54   -1
>



-- 
Best Regards,
Ayan Guha


Can i display message on console when use spark on yarn?

2016-10-20 Thread Jone Zhang
I submit spark with "spark-submit --master yarn-cluster --deploy-mode
cluster"
How can i display message on yarn console.
I expect it to be like this:

.
16/10/20 17:12:53 main INFO org.apache.spark.deploy.yarn.Client>SPK>
Application report for application_1453970859007_481440 (state: RUNNING)
16/10/20 17:12:58 main INFO org.apache.spark.deploy.yarn.Client>SPK>
Application report for application_1453970859007_481440 (state: RUNNING)
16/10/20 17:13:03 main INFO org.apache.spark.deploy.yarn.Client>SPK>
Application report for application_1453970859007_481440 (state: RUNNING)
16/10/20 17:13:08 main INFO org.apache.spark.deploy.yarn.Client>SPK>
Application report for application_1453970859007_481440 (state: RUNNING)
16/10/20 17:13:13 main INFO org.apache.spark.deploy.yarn.Client>SPK>
Application report for application_1453970859007_481440 (state: FINISHED)
16/10/20 17:13:13 main INFO org.apache.spark.deploy.yarn.Client>SPK>
 client token: N/A
 diagnostics: N/A
 ApplicationMaster host: 10.51.215.100
 ApplicationMaster RPC port: 0
 queue: root.default
 start time: 1476954698645
 final status: SUCCEEDED
 tracking URL:
http://10.179.20.47:8080/proxy/application_1453970859007_481440/history/application_1453970859007_481440/1
 user: mqq
===Spark Task Result is ===
===some message want to display===
16/10/20 17:13:13 Thread-3 INFO
org.apache.spark.util.ShutdownHookManager>SPK> Shutdown hook called
16/10/20 17:13:13 Thread-3 INFO
org.apache.spark.util.ShutdownHookManager>SPK> Deleting directory
/data/home/spark_tmp/spark-5b9f434b-5837-46e6-9625-c4656b86af9e

Thanks.


RE: pyspark dataframe codes for lead lag to column

2016-10-20 Thread Mendelson, Assaf
Depending on your usecase, you may want to take a look at window functions

From: muhammet pakyürek [mailto:mpa...@hotmail.com]
Sent: Thursday, October 20, 2016 11:36 AM
To: user@spark.apache.org
Subject: pyspark dataframe codes for lead lag to column





is there pyspark dataframe codes for lead lag to column?

lead/lag column is something

1  lag   -1lead 2
213
324
435
54   -1


Where condition on columns of Arrays does no longer work in spark 2

2016-10-20 Thread filthysocks
I have a Column in a DataFrame that contains Arrays and I wanna filter for
equality. It does work fine in spark 1.6 but not in 2.0In spark 1.6.2:
import org.apache.spark.sql.SQLContextcase class DataTest(lists:
Seq[Int])val sql = new SQLContext(sc)val data =
sql.createDataFrame(sc.parallelize(Seq( DataTest(Seq(1)),  
DataTest(Seq(4,5,6))  
)))data.registerTempTable("uiae")sql.sql(s"SELECT lists FROM uiae WHERE
lists=Array(1)").collect().foreach(println)
returns:[WrappedArray(1)] 
In spark 2.0.0:
import spark.implicits._case class DataTest(lists: Seq[Int])val data =
Seq(DataTest(Seq(1)),DataTest(Seq(4,5,6))).toDS()data.createOrReplaceTempView("uiae")spark.sql(s"SELECT
lists FROM uiae WHERE lists=Array(1)").collect().foreach(println)
returns: nothing

Is that a bug? Or is it just done differently in spark 2?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Where-condition-on-columns-of-Arrays-does-no-longer-work-in-spark-2-tp27926.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Dataframe schema...

2016-10-20 Thread Michael Armbrust
What is the issue you see when unioning?

On Wed, Oct 19, 2016 at 6:39 PM, Muthu Jayakumar  wrote:

> Hello Michael,
>
> Thank you for looking into this query. In my case there seem to be an
> issue when I union a parquet file read from disk versus another dataframe
> that I construct in-memory. The only difference I see is the containsNull =
> true. In fact, I do not see any errors with union on the simple schema of
> "col1 thru col4" above. But the problem seem to exist only on that
> "some_histogram" column which contains the mixed containsNull = true/false.
> Let me know if this helps.
>
> Thanks,
> Muthu
>
>
>
> On Wed, Oct 19, 2016 at 6:21 PM, Michael Armbrust 
> wrote:
>
>> Nullable is just a hint to the optimizer that its impossible for there to
>> be a null value in this column, so that it can avoid generating code for
>> null-checks.  When in doubt, we set nullable=true since it is always safer
>> to check.
>>
>> Why in particular are you trying to change the nullability of the column?
>>
>> On Wed, Oct 19, 2016 at 6:07 PM, Muthu Jayakumar 
>> wrote:
>>
>>> Hello there,
>>>
>>> I am trying to understand how and when does DataFrame (or Dataset) sets
>>> nullable = true vs false on a schema.
>>>
>>> Here is my observation from a sample code I tried...
>>>
>>>
>>> scala> spark.createDataset(Seq((1, "a", 2.0d), (2, "b", 2.0d), (3, "c",
>>> 2.0d))).toDF("col1", "col2", "col3").withColumn("col4",
>>> lit("bla")).printSchema()
>>> root
>>>  |-- col1: integer (nullable = false)
>>>  |-- col2: string (nullable = true)
>>>  |-- col3: double (nullable = false)
>>>  |-- col4: string (nullable = false)
>>>
>>>
>>> scala> spark.createDataset(Seq((1, "a", 2.0d), (2, "b", 2.0d), (3, "c",
>>> 2.0d))).toDF("col1", "col2", "col3").withColumn("col4",
>>> lit("bla")).write.parquet("/tmp/sample.parquet")
>>>
>>> scala> spark.read.parquet("/tmp/sample.parquet").printSchema()
>>> root
>>>  |-- col1: integer (nullable = true)
>>>  |-- col2: string (nullable = true)
>>>  |-- col3: double (nullable = true)
>>>  |-- col4: string (nullable = true)
>>>
>>>
>>> The place where this seem to get me into trouble is when I try to union
>>> one data-structure from in-memory (notice that in the below schema the
>>> highlighted element is represented as 'false' for in-memory created schema)
>>> and one from file that starts out with a schema like below...
>>>
>>>  |-- some_histogram: struct (nullable = true)
>>>  ||-- values: array (nullable = true)
>>>  |||-- element: double (containsNull = true)
>>>  ||-- freq: array (nullable = true)
>>>  |||-- element: long (containsNull = true)
>>>
>>> Is there a way to convert this attribute from true to false without
>>> running any mapping / udf on that column?
>>>
>>> Please advice,
>>> Muthu
>>>
>>
>>
>


pyspark dataframe codes for lead lag to column

2016-10-20 Thread muhammet pakyürek


is there pyspark dataframe codes for lead lag to column?

lead/lag column is something

1  lag   -1lead 2
213
324
435
54   -1


How to iterate the element of an array in DataFrame?

2016-10-20 Thread Yan Facai
Hi, I want to extract the attribute `weight` of an array, and combine them
to construct a sparse vector.

### My data is like this:

scala> mblog_tags.printSchema
root
 |-- category.firstCategory: array (nullable = true)
 ||-- element: struct (containsNull = true)
 |||-- category: string (nullable = true)
 |||-- weight: string (nullable = true)


scala> mblog_tags.show(false)
+--+
|category.firstCategory|
+--+
|[[tagCategory_060, 0.8], [tagCategory_029, 0.7]]|
|[[tagCategory_029, 0.9]]  |
|[[tagCategory_029, 0.8]]  |
+--+


### And expected:
Vectors.sparse(100, Array(60, 29),  Array(0.8, 0.7))
Vectors.sparse(100, Array(29),  Array(0.9))
Vectors.sparse(100, Array(29),  Array(0.8))

How to iterate an array in DataFrame?
Thanks.