Advice on multiple streaming job

2018-05-05 Thread Dhaval Modi
Hi All,

Need advice on executing multiple streaming jobs.

Problem:- We have 100's of streaming job. Every streaming job uses new
port. Also, Spark automatically checks port from 4040 to 4056, post that it
fails. One of the workaround, is to provide port explicitly.

Is there a way to tackle this situation? or Am I missing any thing?

Thanking you in advance.

Regards,
Dhaval Modi
dhavalmod...@gmail.com


help needed in perforance improvement of spark structured streaming

2018-05-05 Thread amit kumar singh
Hi Community,

I have a use case where i need to call stored procedure through structured
streaming.

I am able to send kafka message and call stored procedure ,

but since foreach sink keeps on executing stored procedure per message

i want to combine all the messages in single dtaframe and then call  stored
procedure at once

is it possible to do


current code

select('value cast "string",'topic)
  .select('topic,concat_ws(",", 'value cast "string") as 'value1)
 .groupBy('topic cast "string").count()
.coalesce(1)
.as[String]
.writeStream
.trigger(ProcessingTime("60 seconds"))
.option("checkpointLocation", checkpointUrl)
.foreach(new SimpleSqlServerSink(jdbcUrl, connectionProperties))




thanks
rohit


Spark with HBase on Spark Runtime 2.2.1

2018-05-05 Thread SparkUser6
I wrote a simple program to read data from HBase, the program works find in
Cloudera backed by HDFS.  The program works fine on SPARK RUNTIME 1.6 on
Cloudera.  But does NOT work on EMR with Spark Runtime 2.2.1.

But getting an exception while testing data on EMR with S3.

// Spark conf
SparkConf sparkConf = new
SparkConf().setMaster("local[4]").setAppName("My App");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
// Hbase conf
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum","localhost");
conf.set("hbase.zookeeper.property.client.port","2181");
// Submit scan into hbase conf
 //   conf.set(TableInputFormat.SCAN,
TableMapReduceUtil.convertScanToString(scan));

conf.set(TableInputFormat.INPUT_TABLE, "mytable");
conf.set(TableInputFormat.SCAN_ROW_START, "startrow");
conf.set(TableInputFormat.SCAN_ROW_STOP, "endrow");

// Get RDD
JavaPairRDD source = jsc
.newAPIHadoopRDD(conf, TableInputFormat.class,
ImmutableBytesWritable.class, Result.class);

// Process RDD
System.out.println("&&& " + source.count());



0
down vote
favorite
I wrote a simple program to read data from HBase, the program works find in
Cloudera backed by HDFS.

But getting an exception while testing data on EMR with S3.

// Spark conf
SparkConf sparkConf = new
SparkConf().setMaster("local[4]").setAppName("My App");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
// Hbase conf
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum","localhost");
conf.set("hbase.zookeeper.property.client.port","2181");
// Submit scan into hbase conf
 //   conf.set(TableInputFormat.SCAN,
TableMapReduceUtil.convertScanToString(scan));

conf.set(TableInputFormat.INPUT_TABLE, "mytable");
conf.set(TableInputFormat.SCAN_ROW_START, "startrow");
conf.set(TableInputFormat.SCAN_ROW_STOP, "endrow");

// Get RDD
JavaPairRDD source = jsc
.newAPIHadoopRDD(conf, TableInputFormat.class,
ImmutableBytesWritable.class, Result.class);

// Process RDD
System.out.println("&&& " + source.count());
18/05/04 00:22:02 INFO MetricRegistries: Loaded MetricRegistries class
org.apache.hadoop.hbase.metrics.impl.MetricRegistriesImpl 18/05/04 00:22:02
ERROR TableInputFormat: java.io.IOException:
java.lang.reflect.InvocationTargetException at
org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:240)
Caused by: java.lang.reflect.InvocationTargetException at
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at
org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:238)
Caused by: java.lang.IllegalAccessError: tried to access class
org.apache.hadoop.metrics2.lib.MetricsInfoImpl from class
org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry at
org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry.newGauge(DynamicMetricsRegistry.java:139)
at
org.apache.hadoop.hbase.zookeeper.MetricsZooKeeperSourceImpl.(MetricsZooKeeperSourceImpl.java:59)
at
org.apache.hadoop.hbase.zookeeper.MetricsZooKeeperSourceImpl.(MetricsZooKeeperSourceImpl.java:51)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at
java.lang.Class.newInstance(Class.java:442) at
java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380) ...
42 more

Exception in thread "main" java.io.IOException: Cannot create a record
reader because of a previous error. Please look at the previous
logs lines from the task's full log for more details. at
org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:270)
at
org.apache.hadoop.hbase.mapreduce.TableInputFormat.getSplits(TableInputFormat.java:256)
at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:125)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252) at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250) at
scala.Option.getOrElse(Option.scala:121) at
org.apache.spark.rdd.RDD.partitions(RDD.scala:250) at
org.apache.spark.SparkContext.runJob(SparkContext.scala:2094) at
org.apache.spark.rdd.RDD.count(RDD.scala:1158) at
org.apache.spark.api.java.

Unable to Connect to Apache Phoenix From Spark

2018-05-05 Thread SparkUser6
Simple Java Program to Connect to Phoenix DB:
   SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("Using-spark-phoenix-df"); 
sparkConf.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
Dataset fromPhx = sqlContext
.read()
.format("jdbc")
.options(
ImmutableMap.of("driver",

"org.apache.phoenix.jdbc.PhoenixDriver", "url",

"jdbc:phoenix:ZOOKEEPER_QUORUM_URL:/hbase",
"dbtable", 
"STOCK")).load();
System.out.println(fromPhx.toJavaRDD().count());

spark-submit --class PhoenixToDataFrame \
--master yarn-client --deploy-mode client --executor-memory 1g \
--files /etc/hbase/conf/hbase-site.xml \
--name hbasespark --conf
"spark.executor.extraClassPath=/usr/lib/phoenix/phoenix-spark-4.13.0-HBase-1.4.jar:/usr/lib/phoenix/phoenix-core-4.13.0-HBase-1.4.jar:/usr/lib/phoenix-4.13.0-HBase-1.4-client.jar:/usr/lib/phoenix-client.jar:/usr/lib/phoenix-server.jar"
 
hbaseandspark-0.0.1-SNAPSHOT-jar-with-dependencies.jar

ERROR:

18/05/06 01:25:06 INFO Utils: Successfully started service 'sparkDriver' on
port 34053.
18/05/06 01:25:06 INFO SparkEnv: Registering MapOutputTracker
18/05/06 01:25:06 INFO SparkEnv: Registering BlockManagerMaster
18/05/06 01:25:06 INFO BlockManagerMasterEndpoint: Using
org.apache.spark.storage.DefaultTopologyMapper for getting topology
information
18/05/06 01:25:06 INFO BlockManagerMasterEndpoint:
BlockManagerMasterEndpoint up
18/05/06 01:25:06 INFO DiskBlockManager: Created local directory at
/mnt/tmp/blockmgr-08e5b083-e2bb-4389-8b30-29a1be08ee7e
18/05/06 01:25:06 INFO MemoryStore: MemoryStore started with capacity 414.4
MB
18/05/06 01:25:07 INFO SparkEnv: Registering OutputCommitCoordinator
18/05/06 01:25:07 INFO Utils: Successfully started service 'SparkUI' on port
4040.
18/05/06 01:25:07 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at
http://10.16.129.152:4040
18/05/06 01:25:07 INFO SparkContext: Added JAR
file:/home/ec2-user/test/target/hbaseandspark-0.0.1-SNAPSHOT-jar-with-dependencies.jar
at
spark://10.16.129.152:34053/jars/hbaseandspark-0.0.1-SNAPSHOT-jar-with-dependencies.jar
with timestamp 1525569907456
18/05/06 01:25:07 INFO Executor: Starting executor ID driver on host
localhost
18/05/06 01:25:07 INFO Utils: Successfully started service
'org.apache.spark.network.netty.NettyBlockTransferService' on port 34659.
18/05/06 01:25:07 INFO NettyBlockTransferService: Server created on
10.16.129.152:34659
18/05/06 01:25:07 INFO BlockManager: Using
org.apache.spark.storage.RandomBlockReplicationPolicy for block replication
policy
18/05/06 01:25:07 INFO BlockManagerMaster: Registering BlockManager
BlockManagerId(driver, 10.16.129.152, 34659, None)
18/05/06 01:25:07 INFO BlockManagerMasterEndpoint: Registering block manager
10.16.129.152:34659 with 414.4 MB RAM, BlockManagerId(driver, 10.16.129.152,
34659, None)
18/05/06 01:25:07 INFO BlockManagerMaster: Registered BlockManager
BlockManagerId(driver, 10.16.129.152, 34659, None)
18/05/06 01:25:07 INFO BlockManager: external shuffle service port = 7337
18/05/06 01:25:07 INFO BlockManager: Initialized BlockManager:
BlockManagerId(driver, 10.16.129.152, 34659, None)
18/05/06 01:25:09 INFO EventLoggingListener: Logging events to
hdfs:///var/log/spark/apps/local-1525569907548
18/05/06 01:25:09 INFO SharedState: loading hive config file:
file:/etc/spark/conf.dist/hive-site.xml
18/05/06 01:25:09 INFO SharedState: Setting hive.metastore.warehouse.dir
('null') to the value of spark.sql.warehouse.dir
('hdfs:///user/spark/warehouse').
18/05/06 01:25:09 INFO SharedState: Warehouse path is
'hdfs:///user/spark/warehouse'.
18/05/06 01:25:11 INFO StateStoreCoordinatorRef: Registered
StateStoreCoordinator endpoint
18/05/06 01:25:12 INFO ConnectionQueryServicesImpl: An instance of
ConnectionQueryServices was created.
18/05/06 01:25:12 WARN RpcControllerFactory: Cannot load configured
"hbase.rpc.controllerfactory.class"
(org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory) from
hbase-site.xml, falling back to use default RpcControllerFactory
18/05/06 01:25:12 INFO RecoverableZooKeeper: Process
identifier=hconnection-0x50110971 connecting to ZooKeeper
ensemble=ZOOKEEPER_QUORUM_URL:2181
18/05/06 01:25:13 INFO MetricsConfig: loaded properties from
hadoop-metrics2.properties
18/05/06 01:25:13 INFO MetricsSystemImpl: Scheduled Metric snapshot period
at 300 second(s).
18/05/06 01:25:13 INFO MetricsSystemImpl: HBase metrics system started
18/05/06 01:25:13 INFO MetricRegistries: Loaded MetricRegistries class
org.apache