Spark is not evenly distributing data
-- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Getting Data From Hbase using Spark is Extremely Slow
I have written four lines of simple spark program to process data in Phoenix table: queryString = getQueryFullString( );// Get data from Phoenix table select col from table JavaPairRDDphRDD = jsc.newAPIHadoopRDD( configuration, PhoenixInputFormat.class, NullWritable.class, TestWritable.class); JavaRDD rdd = phRDD.map(new Function , Long>() { @Override//Goal is to scan all the data public Long call(Tuple2 tuple) throws Exception { return 1L; } }); System.out.println(rdd.count()); This program takes 2 hours to process for 2 million record, can anyone help me understand what is wrong. -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Error submitting Spark Job in yarn-cluster mode on EMR
I have a simple program that works fine in the local mode. But I am having issues when I try to run the program in yarn-cluster mode. I know usually no such method happens when compile and run version mismatch but I made sure I took the same version. 205 [main] INFO org.spark_project.jetty.server.ServerConnector - Started Spark@29539e36{HTTP/1.1}{0.0.0.0:4040} 205 [main] INFO org.spark_project.jetty.server.Server - Started @3265ms Exception in thread "main" java.lang.NoSuchMethodError: org.apache.spark.internal.config.package$.APP_CALLER_CONTEXT()Lorg/apache/spark/internal/config/OptionalConfigEntry; at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:163) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:56) at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:156) at org.apache.spark.SparkContext.(SparkContext.scala:509) at org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:58) at com.voicebase.etl.PhoenixToElasticSearch.main(PhoenixToElasticSearch.java:54) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:743) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Unable to Connect to Apache Phoenix From Spark
Simple Java Program to Connect to Phoenix DB: SparkConf sparkConf = new SparkConf(); sparkConf.setAppName("Using-spark-phoenix-df"); sparkConf.setMaster("local[*]"); JavaSparkContext sc = new JavaSparkContext(sparkConf); SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); Dataset fromPhx = sqlContext .read() .format("jdbc") .options( ImmutableMap.of("driver", "org.apache.phoenix.jdbc.PhoenixDriver", "url", "jdbc:phoenix:ZOOKEEPER_QUORUM_URL:/hbase", "dbtable", "STOCK")).load(); System.out.println(fromPhx.toJavaRDD().count()); spark-submit --class PhoenixToDataFrame \ --master yarn-client --deploy-mode client --executor-memory 1g \ --files /etc/hbase/conf/hbase-site.xml \ --name hbasespark --conf "spark.executor.extraClassPath=/usr/lib/phoenix/phoenix-spark-4.13.0-HBase-1.4.jar:/usr/lib/phoenix/phoenix-core-4.13.0-HBase-1.4.jar:/usr/lib/phoenix-4.13.0-HBase-1.4-client.jar:/usr/lib/phoenix-client.jar:/usr/lib/phoenix-server.jar" hbaseandspark-0.0.1-SNAPSHOT-jar-with-dependencies.jar ERROR: 18/05/06 01:25:06 INFO Utils: Successfully started service 'sparkDriver' on port 34053. 18/05/06 01:25:06 INFO SparkEnv: Registering MapOutputTracker 18/05/06 01:25:06 INFO SparkEnv: Registering BlockManagerMaster 18/05/06 01:25:06 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information 18/05/06 01:25:06 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up 18/05/06 01:25:06 INFO DiskBlockManager: Created local directory at /mnt/tmp/blockmgr-08e5b083-e2bb-4389-8b30-29a1be08ee7e 18/05/06 01:25:06 INFO MemoryStore: MemoryStore started with capacity 414.4 MB 18/05/06 01:25:07 INFO SparkEnv: Registering OutputCommitCoordinator 18/05/06 01:25:07 INFO Utils: Successfully started service 'SparkUI' on port 4040. 18/05/06 01:25:07 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://10.16.129.152:4040 18/05/06 01:25:07 INFO SparkContext: Added JAR file:/home/ec2-user/test/target/hbaseandspark-0.0.1-SNAPSHOT-jar-with-dependencies.jar at spark://10.16.129.152:34053/jars/hbaseandspark-0.0.1-SNAPSHOT-jar-with-dependencies.jar with timestamp 1525569907456 18/05/06 01:25:07 INFO Executor: Starting executor ID driver on host localhost 18/05/06 01:25:07 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 34659. 18/05/06 01:25:07 INFO NettyBlockTransferService: Server created on 10.16.129.152:34659 18/05/06 01:25:07 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy 18/05/06 01:25:07 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 10.16.129.152, 34659, None) 18/05/06 01:25:07 INFO BlockManagerMasterEndpoint: Registering block manager 10.16.129.152:34659 with 414.4 MB RAM, BlockManagerId(driver, 10.16.129.152, 34659, None) 18/05/06 01:25:07 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 10.16.129.152, 34659, None) 18/05/06 01:25:07 INFO BlockManager: external shuffle service port = 7337 18/05/06 01:25:07 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 10.16.129.152, 34659, None) 18/05/06 01:25:09 INFO EventLoggingListener: Logging events to hdfs:///var/log/spark/apps/local-1525569907548 18/05/06 01:25:09 INFO SharedState: loading hive config file: file:/etc/spark/conf.dist/hive-site.xml 18/05/06 01:25:09 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('hdfs:///user/spark/warehouse'). 18/05/06 01:25:09 INFO SharedState: Warehouse path is 'hdfs:///user/spark/warehouse'. 18/05/06 01:25:11 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint 18/05/06 01:25:12 INFO ConnectionQueryServicesImpl: An instance of ConnectionQueryServices was created. 18/05/06 01:25:12 WARN RpcControllerFactory: Cannot load configured "hbase.rpc.controllerfactory.class" (org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory) from hbase-site.xml, falling back to use default RpcControllerFactory 18/05/06 01:25:12 INFO RecoverableZooKeeper: Process identifier=hconnection-0x50110971 connecting to ZooKeeper ensemble=ZOOKEEPER_QUORUM_URL:2181 18/05/06 01:25:13 INFO MetricsConfig: loaded properties from hadoop-metrics2.properties 18/05/06 01:25:13 INFO MetricsSystemImpl: Scheduled Metric snapshot period at 300 second(s). 18/05/06 01:25:13 INFO MetricsSystemImpl: HBase metrics system started 18/05/06 01:25:13 INFO MetricRegistries: Loaded MetricRegistries class
Spark with HBase on Spark Runtime 2.2.1
I wrote a simple program to read data from HBase, the program works find in Cloudera backed by HDFS. The program works fine on SPARK RUNTIME 1.6 on Cloudera. But does NOT work on EMR with Spark Runtime 2.2.1. But getting an exception while testing data on EMR with S3. // Spark conf SparkConf sparkConf = new SparkConf().setMaster("local[4]").setAppName("My App"); JavaSparkContext jsc = new JavaSparkContext(sparkConf); // Hbase conf Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum","localhost"); conf.set("hbase.zookeeper.property.client.port","2181"); // Submit scan into hbase conf // conf.set(TableInputFormat.SCAN, TableMapReduceUtil.convertScanToString(scan)); conf.set(TableInputFormat.INPUT_TABLE, "mytable"); conf.set(TableInputFormat.SCAN_ROW_START, "startrow"); conf.set(TableInputFormat.SCAN_ROW_STOP, "endrow"); // Get RDD JavaPairRDDsource = jsc .newAPIHadoopRDD(conf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class); // Process RDD System.out.println("&&& " + source.count()); 0 down vote favorite I wrote a simple program to read data from HBase, the program works find in Cloudera backed by HDFS. But getting an exception while testing data on EMR with S3. // Spark conf SparkConf sparkConf = new SparkConf().setMaster("local[4]").setAppName("My App"); JavaSparkContext jsc = new JavaSparkContext(sparkConf); // Hbase conf Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum","localhost"); conf.set("hbase.zookeeper.property.client.port","2181"); // Submit scan into hbase conf // conf.set(TableInputFormat.SCAN, TableMapReduceUtil.convertScanToString(scan)); conf.set(TableInputFormat.INPUT_TABLE, "mytable"); conf.set(TableInputFormat.SCAN_ROW_START, "startrow"); conf.set(TableInputFormat.SCAN_ROW_STOP, "endrow"); // Get RDD JavaPairRDD source = jsc .newAPIHadoopRDD(conf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class); // Process RDD System.out.println("&&& " + source.count()); 18/05/04 00:22:02 INFO MetricRegistries: Loaded MetricRegistries class org.apache.hadoop.hbase.metrics.impl.MetricRegistriesImpl 18/05/04 00:22:02 ERROR TableInputFormat: java.io.IOException: java.lang.reflect.InvocationTargetException at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:240) Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:238) Caused by: java.lang.IllegalAccessError: tried to access class org.apache.hadoop.metrics2.lib.MetricsInfoImpl from class org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry at org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry.newGauge(DynamicMetricsRegistry.java:139) at org.apache.hadoop.hbase.zookeeper.MetricsZooKeeperSourceImpl.(MetricsZooKeeperSourceImpl.java:59) at org.apache.hadoop.hbase.zookeeper.MetricsZooKeeperSourceImpl.(MetricsZooKeeperSourceImpl.java:51) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at java.lang.Class.newInstance(Class.java:442) at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380) ... 42 more Exception in thread "main" java.io.IOException: Cannot create a record reader because of a previous error. Please look at the previous logs lines from the task's full log for more details. at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:270) at org.apache.hadoop.hbase.mapreduce.TableInputFormat.getSplits(TableInputFormat.java:256) at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:125) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:250) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2094) at