Spark is not evenly distributing data

2018-05-19 Thread SparkUser6




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Getting Data From Hbase using Spark is Extremely Slow

2018-05-17 Thread SparkUser6
I have written four lines of simple spark program to process data in Phoenix
table:  
queryString = getQueryFullString( );// Get data from Phoenix table select
col from table


JavaPairRDD phRDD = jsc.newAPIHadoopRDD(
configuration,
PhoenixInputFormat.class,
NullWritable.class,
TestWritable.class);
   
 JavaRDD rdd = phRDD.map(new Function, Long>() {  
@Override//Goal is to scan all the data
public Long call(Tuple2 tuple) throws
Exception {
return 1L;
}
});
   System.out.println(rdd.count());

This program takes 2 hours to process for 2 million record, can anyone help
me understand what is wrong.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Error submitting Spark Job in yarn-cluster mode on EMR

2018-05-08 Thread SparkUser6
I have a simple program that works fine in the local mode.  But I am having
issues when I try to run the program in yarn-cluster mode.  I know usually
no such method happens when compile and run version mismatch but I made sure
I took the same version.

205  [main] INFO  org.spark_project.jetty.server.ServerConnector  - Started
Spark@29539e36{HTTP/1.1}{0.0.0.0:4040}
205  [main] INFO  org.spark_project.jetty.server.Server  - Started @3265ms
Exception in thread "main" java.lang.NoSuchMethodError:
org.apache.spark.internal.config.package$.APP_CALLER_CONTEXT()Lorg/apache/spark/internal/config/OptionalConfigEntry;
at 
org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:163)
at
org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:56)
at
org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:156)
at org.apache.spark.SparkContext.(SparkContext.scala:509)
at
org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:58)
at
com.voicebase.etl.PhoenixToElasticSearch.main(PhoenixToElasticSearch.java:54)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:743)
at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Unable to Connect to Apache Phoenix From Spark

2018-05-05 Thread SparkUser6
Simple Java Program to Connect to Phoenix DB:
   SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("Using-spark-phoenix-df"); 
sparkConf.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
Dataset fromPhx = sqlContext
.read()
.format("jdbc")
.options(
ImmutableMap.of("driver",

"org.apache.phoenix.jdbc.PhoenixDriver", "url",

"jdbc:phoenix:ZOOKEEPER_QUORUM_URL:/hbase",
"dbtable", 
"STOCK")).load();
System.out.println(fromPhx.toJavaRDD().count());

spark-submit --class PhoenixToDataFrame \
--master yarn-client --deploy-mode client --executor-memory 1g \
--files /etc/hbase/conf/hbase-site.xml \
--name hbasespark --conf
"spark.executor.extraClassPath=/usr/lib/phoenix/phoenix-spark-4.13.0-HBase-1.4.jar:/usr/lib/phoenix/phoenix-core-4.13.0-HBase-1.4.jar:/usr/lib/phoenix-4.13.0-HBase-1.4-client.jar:/usr/lib/phoenix-client.jar:/usr/lib/phoenix-server.jar"
 
hbaseandspark-0.0.1-SNAPSHOT-jar-with-dependencies.jar

ERROR:

18/05/06 01:25:06 INFO Utils: Successfully started service 'sparkDriver' on
port 34053.
18/05/06 01:25:06 INFO SparkEnv: Registering MapOutputTracker
18/05/06 01:25:06 INFO SparkEnv: Registering BlockManagerMaster
18/05/06 01:25:06 INFO BlockManagerMasterEndpoint: Using
org.apache.spark.storage.DefaultTopologyMapper for getting topology
information
18/05/06 01:25:06 INFO BlockManagerMasterEndpoint:
BlockManagerMasterEndpoint up
18/05/06 01:25:06 INFO DiskBlockManager: Created local directory at
/mnt/tmp/blockmgr-08e5b083-e2bb-4389-8b30-29a1be08ee7e
18/05/06 01:25:06 INFO MemoryStore: MemoryStore started with capacity 414.4
MB
18/05/06 01:25:07 INFO SparkEnv: Registering OutputCommitCoordinator
18/05/06 01:25:07 INFO Utils: Successfully started service 'SparkUI' on port
4040.
18/05/06 01:25:07 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at
http://10.16.129.152:4040
18/05/06 01:25:07 INFO SparkContext: Added JAR
file:/home/ec2-user/test/target/hbaseandspark-0.0.1-SNAPSHOT-jar-with-dependencies.jar
at
spark://10.16.129.152:34053/jars/hbaseandspark-0.0.1-SNAPSHOT-jar-with-dependencies.jar
with timestamp 1525569907456
18/05/06 01:25:07 INFO Executor: Starting executor ID driver on host
localhost
18/05/06 01:25:07 INFO Utils: Successfully started service
'org.apache.spark.network.netty.NettyBlockTransferService' on port 34659.
18/05/06 01:25:07 INFO NettyBlockTransferService: Server created on
10.16.129.152:34659
18/05/06 01:25:07 INFO BlockManager: Using
org.apache.spark.storage.RandomBlockReplicationPolicy for block replication
policy
18/05/06 01:25:07 INFO BlockManagerMaster: Registering BlockManager
BlockManagerId(driver, 10.16.129.152, 34659, None)
18/05/06 01:25:07 INFO BlockManagerMasterEndpoint: Registering block manager
10.16.129.152:34659 with 414.4 MB RAM, BlockManagerId(driver, 10.16.129.152,
34659, None)
18/05/06 01:25:07 INFO BlockManagerMaster: Registered BlockManager
BlockManagerId(driver, 10.16.129.152, 34659, None)
18/05/06 01:25:07 INFO BlockManager: external shuffle service port = 7337
18/05/06 01:25:07 INFO BlockManager: Initialized BlockManager:
BlockManagerId(driver, 10.16.129.152, 34659, None)
18/05/06 01:25:09 INFO EventLoggingListener: Logging events to
hdfs:///var/log/spark/apps/local-1525569907548
18/05/06 01:25:09 INFO SharedState: loading hive config file:
file:/etc/spark/conf.dist/hive-site.xml
18/05/06 01:25:09 INFO SharedState: Setting hive.metastore.warehouse.dir
('null') to the value of spark.sql.warehouse.dir
('hdfs:///user/spark/warehouse').
18/05/06 01:25:09 INFO SharedState: Warehouse path is
'hdfs:///user/spark/warehouse'.
18/05/06 01:25:11 INFO StateStoreCoordinatorRef: Registered
StateStoreCoordinator endpoint
18/05/06 01:25:12 INFO ConnectionQueryServicesImpl: An instance of
ConnectionQueryServices was created.
18/05/06 01:25:12 WARN RpcControllerFactory: Cannot load configured
"hbase.rpc.controllerfactory.class"
(org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory) from
hbase-site.xml, falling back to use default RpcControllerFactory
18/05/06 01:25:12 INFO RecoverableZooKeeper: Process
identifier=hconnection-0x50110971 connecting to ZooKeeper
ensemble=ZOOKEEPER_QUORUM_URL:2181
18/05/06 01:25:13 INFO MetricsConfig: loaded properties from
hadoop-metrics2.properties
18/05/06 01:25:13 INFO MetricsSystemImpl: Scheduled Metric snapshot period
at 300 second(s).
18/05/06 01:25:13 INFO MetricsSystemImpl: HBase metrics system started
18/05/06 01:25:13 INFO MetricRegistries: Loaded MetricRegistries class

Spark with HBase on Spark Runtime 2.2.1

2018-05-05 Thread SparkUser6
I wrote a simple program to read data from HBase, the program works find in
Cloudera backed by HDFS.  The program works fine on SPARK RUNTIME 1.6 on
Cloudera.  But does NOT work on EMR with Spark Runtime 2.2.1.

But getting an exception while testing data on EMR with S3.

// Spark conf
SparkConf sparkConf = new
SparkConf().setMaster("local[4]").setAppName("My App");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
// Hbase conf
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum","localhost");
conf.set("hbase.zookeeper.property.client.port","2181");
// Submit scan into hbase conf
 //   conf.set(TableInputFormat.SCAN,
TableMapReduceUtil.convertScanToString(scan));

conf.set(TableInputFormat.INPUT_TABLE, "mytable");
conf.set(TableInputFormat.SCAN_ROW_START, "startrow");
conf.set(TableInputFormat.SCAN_ROW_STOP, "endrow");

// Get RDD
JavaPairRDD source = jsc
.newAPIHadoopRDD(conf, TableInputFormat.class,
ImmutableBytesWritable.class, Result.class);

// Process RDD
System.out.println("&&& " + source.count());



0
down vote
favorite
I wrote a simple program to read data from HBase, the program works find in
Cloudera backed by HDFS.

But getting an exception while testing data on EMR with S3.

// Spark conf
SparkConf sparkConf = new
SparkConf().setMaster("local[4]").setAppName("My App");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
// Hbase conf
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum","localhost");
conf.set("hbase.zookeeper.property.client.port","2181");
// Submit scan into hbase conf
 //   conf.set(TableInputFormat.SCAN,
TableMapReduceUtil.convertScanToString(scan));

conf.set(TableInputFormat.INPUT_TABLE, "mytable");
conf.set(TableInputFormat.SCAN_ROW_START, "startrow");
conf.set(TableInputFormat.SCAN_ROW_STOP, "endrow");

// Get RDD
JavaPairRDD source = jsc
.newAPIHadoopRDD(conf, TableInputFormat.class,
ImmutableBytesWritable.class, Result.class);

// Process RDD
System.out.println("&&& " + source.count());
18/05/04 00:22:02 INFO MetricRegistries: Loaded MetricRegistries class
org.apache.hadoop.hbase.metrics.impl.MetricRegistriesImpl 18/05/04 00:22:02
ERROR TableInputFormat: java.io.IOException:
java.lang.reflect.InvocationTargetException at
org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:240)
Caused by: java.lang.reflect.InvocationTargetException at
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at
org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:238)
Caused by: java.lang.IllegalAccessError: tried to access class
org.apache.hadoop.metrics2.lib.MetricsInfoImpl from class
org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry at
org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry.newGauge(DynamicMetricsRegistry.java:139)
at
org.apache.hadoop.hbase.zookeeper.MetricsZooKeeperSourceImpl.(MetricsZooKeeperSourceImpl.java:59)
at
org.apache.hadoop.hbase.zookeeper.MetricsZooKeeperSourceImpl.(MetricsZooKeeperSourceImpl.java:51)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at
java.lang.Class.newInstance(Class.java:442) at
java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380) ...
42 more

Exception in thread "main" java.io.IOException: Cannot create a record
reader because of a previous error. Please look at the previous
logs lines from the task's full log for more details. at
org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:270)
at
org.apache.hadoop.hbase.mapreduce.TableInputFormat.getSplits(TableInputFormat.java:256)
at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:125)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252) at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250) at
scala.Option.getOrElse(Option.scala:121) at
org.apache.spark.rdd.RDD.partitions(RDD.scala:250) at
org.apache.spark.SparkContext.runJob(SparkContext.scala:2094) at