Can't access the data in Kafka Spark Streaming globally
I am trying to stream the data from Kafka to Spark. JavaPairInputDStream directKafkaStream = KafkaUtils.createDirectStream(ssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics); Here i am iterating over the JavaPairInputDStream to process the RDD's. directKafkaStream.foreachRDD(rdd ->{ rdd.foreachPartition(items ->{ while (items.hasNext()) { String[] State = items.next()._2.split("\\,"); System.out.println(State[2]+","+State[3]+","+State[4]+"--"); }; }); }); In this i can able to access the String Array but when i am trying to access the String Array data globally i can't access the data. Here my requirement is if i had access these data globally i had another lookup table in Hive. So i am trying to perform an operation on these. Any suggestions please, Thanks. -- Best Regards, Sreeharsha Eedupuganti
How to perform Join operation using JAVARDD
I tried like this, *CrashData_1.csv:* *CRASH_KEYCRASH_NUMBER CRASH_DATECRASH_MONTH* *2016899114 2016899114 01/02/2016 12:00:00 AM +* *CrashData_2.csv:* *CITY_NAMEZIPCODE CITY STATE* *1945 704 PARC PARQUE PR* Code: *JavaRDD firstRDD = sc.textFile("/Users/apple/Desktop/CrashData_1.csv");* *JavaRDD secondRDD = sc.textFile("/Users/apple/Desktop/CrashData_2.csv");* *JavaRDD allRDD = firstRDD.union(secondRDD);* *Output i am getting:* *[CRASH_KEY,CRASH_NUMBER,CRASH_DATE,CRASH_MONTH, 2016899114,2016899114,01/02/2016 12:00:00 AM + * *CITY_NAME,ZIPCODE,CITY,STATE, **1945,704,PARC PARQUE,PR]* *Any suggesttions please, Thanks in advance*
How to read a Multi Line json object via Spark
I tried from Spark-Shell and i am getting the following error: Here is the test.json file: { "colorsArray": [{ "red": "#f00", "green": "#0f0", "blue": "#00f", "cyan": "#0ff", "magenta": "#f0f", "yellow": "#ff0", "black": "#000" }]} scala> val jtex = sqlContext.read.format("json").option("samplingRatio","1.0").load("/user/spark/test.json") jtex: org.apache.spark.sql.DataFrame = [_corrupt_record: string] Any suggestions please. Thanks. -- Best Regards, Sreeharsha Eedupuganti Data Engineer innData Analytics Private Limited
Convert hive sql to spark sql
Hi users i need to test the performance of the query in hive and spark. Can any one convert these sql to spark sql. Here is the sql. SELECT split(DTD.TRAN_RMKS,'/')[0] AS TRAB_RMK1, split(DTD.TRAN_RMKS,'/')[1] AS ATM_ID, DTD.ACID, G.FORACID, DTD.REF_NUM, DTD.TRAN_ID, DTD.TRAN_DATE, DTD.VALUE_DATE, DTD.TRAN_PARTICULAR, DTD.TRAN_RMKS, DTD.TRAN_AMT, SYSDATE_ORA(), DTD.PSTD_DATE, DTD.PSTD_FLG, G.CUSTID, NULL AS PROC_FLG, DTD.PSTD_USER_ID, DTD.ENTRY_USER_ID, G.schemecode as SCODE FROM DAILY_TRAN_DETAIL_TABLE2 DTD JOIN ods_gam G ON DTD.ACID = G.ACID where substr(DTD.TRAN_PARTICULAR,1,3) rlike '(PUR|POS).*' AND DTD.PART_TRAN_TYPE = 'D' AND DTD.DEL_FLG <> 'Y' AND DTD.PSTD_FLG = 'Y' AND G.schemecode IN ('SBPRV','SBPRS','WSSTF','BGFRN','NREPV','NROPV','BSNRE','BSNRO') AND (SUBSTR(split(DTD.TRAN_RMKS,'/')[0],1,6) IN ('405997','406228','406229','415527','415528','417917','417918','418210','421539','421572','432198','435736','450502','450503','450504','468805','469190','469191','469192','474856','478286','478287','486292','490222','490223','490254','512932','512932','514833','522346','522352','524458','526106','526701','527114','527479','529608','529615','529616','532731','532734','533102','534680','536132','536610','536621','539149','539158','549751','557654','607118','607407','607445','607529','652189','652190','652157') OR SUBSTR(split(DTD.TRAN_RMKS,'/')[0],1,8) IN ('53270200','53270201','53270202','60757401','60757402') ) limit 50; -- Best Regards, Sreeharsha Eedupuganti
How to convert List into json object / json Array
Here is the snippet of my code : Dataset rows_salaries = spark.read().json("/Users/Macbook/Downloads/rows_salaries.json"); rows_salaries.createOrReplaceTempView("salaries"); List df = spark.sql("select * from salaries").collectAsList(); I need to read the json data from 'List df = spark.sql("select * from salaries").collectAsList();' but i am unable to convert 'df' to either JSONObject or JSONArray. Is the way am i going is right or any other way to fetch the JSON data. Any suggestions please... Thanks...
How to convert List into json object / json Array
Any suggesttions please.
Converting Dataframe to resultSet in Spark Java
Retrieved the data to DataFrame but i can't convert into ResultSet Is there any possible way how to convert...Any suggestions please... Exception in thread "main" java.lang.ClassCastException: org.apache.spark.sql.DataFrame cannot be cast to com.datastax.driver.core.ResultSet -- Best Regards, Sreeharsha Eedupuganti Data Engineer innData Analytics Private Limited
Re: How to insert data into 2000 partitions(directories) of ORC/parquet at a time using Spark SQL?
Hi Spark users, i am new to spark. I am trying to connect hive using SparkJavaContext. Unable to connect to the database. By executing the below code i can see only "default" database. Can anyone help me out. What i need is a sample program for Querying Hive results using SparkJavaContext. Need to pass any values like this. userDF.registerTempTable("userRecordsTemp") sqlContext.sql("SET hive.default.fileformat=Orc ") sqlContext.sql("set hive.enforce.bucketing = true; ") sqlContext.sql("set hive.enforce.sorting = true; ") public static void main(String[] args ) throws Exception { SparkConf sparkConf = new SparkConf().setAppName("SparkSQL").setMaster("local"); SparkContext ctx=new SparkContext(sparkConf); HiveContext hiveql=new org.apache.spark.sql.hive.HiveContext(ctx); DataFrame df=hiveql.sql("show databases"); df.show(); } Any suggestions pleaseThanks.
Content-based Recommendation Engine
Can anyone share the code for Content-based Recommendation Engine to recommend the user based on E-mail subject. -- Best Regards, Sreeharsha Eedupuganti Data Engineer innData Analytics Private Limited
Content-based Recommendation Engine
Can anyone share the code for Content-based Recommendation Engine to recommend the user based on E-mail subject. -- Best Regards, Sreeharsha Eedupuganti Data Engineer innData Analytics Private Limited
Spark Select Statement
Hello Spark users, can we query the SQL SELECT statement in Spark using Java. if it is possible any suggestions please. I tried like this.How to pass the database name. Here my database name is nimbus and table name is winbox_opens. *Source Code :* *public class Select { public static class SquareKey implements Function { public Integer call(Row row) throws Exception { return row.getInt(0) * row.getInt(0); } } public static void main(String[] args) throws Exception { SparkConf s = new SparkConf().setMaster("local[2]").setAppName("Select"); SparkContext sc = new SparkContext(s); HiveContext hc = new HiveContext(sc); DataFrame rdd = hc.sql("SELECT * FROM winbox_opens"); JavaRDD squaredKeys = rdd.toJavaRDD().map(new SquareKey()); List result = squaredKeys.collect(); for (Integer elem : result) { System.out.println(elem); } }}* *Error: Exception in thread "main" org.apache.spark.sql.AnalysisException: no such table winbox_prod_action_logs_1; line 1 pos 14* -- Best Regards, Sreeharsha Eedupuganti Data Engineer innData Analytics Private Limited
Need a sample code to load XML files into cassandra database using spark streaming
Hello everyone, new to spark streaming, need a sample code to load xml files from AWS S3 server to cassandra database. Any suggesttions please, Thanks in advance. -- Best Regards, Sreeharsha Eedupuganti Data Engineer innData Analytics Private Limited
How to migrate spark code to spark streaming ?
Hello everyone, Loading XML files from S3 to database [i.e Cassandra]. Right now my code is in Spark Core. I want to migrate my code to Spark Streaming because for every 15 minutes we have to load XML files into database. So in this case i need to migrate my code to Spark Streaming. Any suggestions please. Thanks in advance. -- Best Regards, Sreeharsha Eedupuganti Data Engineer innData Analytics Private Limited
How to send a file to database using spark streaming
New to Spark Streaming. My question is i want to load the XML files to database [cassandra] using spark streaming.Any suggestions please.Thanks in Advance. -- Best Regards, Sreeharsha Eedupuganti Data Engineer innData Analytics Private Limited
Getting an error while submitting spark jar
The way how i submitting jar hadoop@localhost:/usr/local/hadoop/spark$ ./bin/spark-submit \ > --class mllib.perf.TesRunner \ > --master spark://localhost:7077 \ > --executor-memory 2G \ > --total-executor-cores 100 \ > /usr/local/hadoop/spark/lib/mllib-perf-tests-assembly.jar \ > 1000 And here is my error,Spark assembly has been built with Hive, including Datanucleus jars on classpath java.lang.ClassNotFoundException: mllib.perf.TesRunner at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:538) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) hadoop@localhost:/usr/local/hadoop/spark$ Thanks in Advance -- Best Regards, Sreeharsha Eedupuganti Data Engineer innData Analytics Private Limited
Getting an error in insertion to mysql through sparkcontext in java..
i had 9 rows in my Mysql table options.put("dbtable", "(select * from employee"); options.put("lowerBound", "1"); options.put("upperBound", "8"); options.put("numPartitions", "2"); Error : Parameter index out of range (1 > number of parameters, which is 0) -- Best Regards, Sreeharsha Eedupuganti Data Engineer innData Analytics Private Limited