spark + SalesForce SSL HandShake Issue

2017-12-17 Thread kali.tumm...@gmail.com
Hi All, 

I was trying out spark + SalesforceLibabry on cloudera 5.9 I am having SSL
handhshake issue please check out my question on stack over flow no one
answered.

The library works ok on windows it fails when I try to run on cloudera edge
node.

https://stackoverflow.com/questions/47820372/ssl-handshake-issue-on-cloudera-5-9

Did  anyone tried out these spark + salesforce library (
https://github.com/springml/spark-salesforce) +
(https://github.com/springml/salesforce-wave-api).


I am trying to use sales force wave api library
(https://github.com/springml/salesforce-wave-api) in cloudera cluster 5.9 to
get data, I have to use proxy because from our cluster its the only way to
communicate outside world. So I made changes to the library to take proxy
host and port to communicate below are the place where I made changes.

Change 1:-

config.setProxy("myenterpiseproxy server",port);
https://github.com/springml/salesforce-wave-api/blob/0ac76aeb2221d9e7038229fd352a8694e8cde7e9/src/main/java/com/springml/salesforce/wave/util/SFConfig.java#L101

Change 2:-

HttpHost proxy = new HttpHost("myenterpiseproxy server", port, "http");
https://github.com/springml/salesforce-wave-api/blob/0ac76aeb2221d9e7038229fd352a8694e8cde7e9/src/main/java/com/springml/salesforce/wave/util/HTTPHelper.java#L127

Change 3:-

RequestConfig requestConfig =
RequestConfig.custom().setSocketTimeout(timeout)
   
.setConnectTimeout(timeout).setConnectionRequestTimeout(timeout).setProxy(proxy).build();
https://github.com/springml/salesforce-wave-api/blob/0ac76aeb2221d9e7038229fd352a8694e8cde7e9/src/main/java/com/springml/salesforce/wave/util/HTTPHelper.java#L129

I built an application to use salesforce wave api as dependency and I tried
to execute the Jar I am getting SSL handshake issue.

I passed in javax.net.ssl.truststore,javax.net.ssl.keyStore,https.protocols
of my cluster still having problems.

Did anyone had similar issue ? did anyone tried to use this library in
cloudera cluster ?

Run Book:-

java -cp
httpclient-4.5.jar:SFWaveApiTest-1.0-SNAPSHOT-jar-with-dependencies.jar
com.az.sfget.SFGetTest "username" "passwordwithtoken"
"https://test.salesforce.com/services/Soap/u/35; "select id,OWNERID from
someobject" "enterpiseproxyhost" "9400" "TLSv1.1,TLSv1.2"
"/usr/java/jdk1.7.0_67-cloudera/jre/lib/security/jssecacerts"
"/opt/cloudera/security/jks/uscvlpcldra-keystore.jks"
Error:-

javax.net.ssl.SSLHandshakeException: Remote host closed connection during
handshake
at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:946)
at
sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1312)
at
sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1339)
at
sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1323)
at
org.apache.http.conn.ssl.SSLConnectionSocketFactory.createLayeredSocket(SSLConnectionSocketFactory.java:394)
at
org.apache.http.conn.ssl.SSLConnectionSocketFactory.connectSocket(SSLConnectionSocketFactory.java:353)
at
org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:134)
at
org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:353)
at
org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:388)
at
org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236)
at
org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:184)
at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:88)
at
org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110)
at
org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:184)
at
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
at
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:107)
at
com.springml.salesforce.wave.util.HTTPHelper.execute(HTTPHelper.java:122)
at com.springml.salesforce.wave.util.HTTPHelper.get(HTTPHelper.java:88)
at com.springml.salesforce.wave.util.HTTPHelper.get(HTTPHelper.java:92)
at
com.springml.salesforce.wave.impl.ForceAPIImpl.query(ForceAPIImpl.java:120)
at
com.springml.salesforce.wave.impl.ForceAPIImpl.query(ForceAPIImpl.java:36)
at com.az.sfget.SFGetTest.main(SFGetTest.java:54)
Caused by: java.io.EOFException: SSL peer shut down incorrectly
at sun.security.ssl.InputRecord.read(InputRecord.java:482)
at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927)
... 21 more


Thanks
Sri 



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



sql to spark scala rdd

2016-07-29 Thread kali.tumm...@gmail.com
Hi All, 

I managed to write business requirement in spark-sql and hive I am still
learning scala how this below sql be written using spark RDD not spark data
frames.

SELECT DATE,balance,
SUM(balance) OVER (ORDER BY DATE ROWS BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW) daily_balance
FROM  table





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/sql-to-spark-scala-rdd-tp27433.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



spark local dir to HDFS ?

2016-07-05 Thread kali.tumm...@gmail.com
Hi All, 

can I set spark.local.dir to HDFS location instead of /tmp folder ?

I tried setting up temp folder to HDFS but it didn't worked can
spark.local.dir write to HDFS ? 

.set("spark.local.dir","hdfs://namednode/spark_tmp/")


16/07/05 15:35:47 ERROR DiskBlockManager: Failed to create local dir in
hdfs://namenode/spark_tmp/. Ignoring this directory.
java.io.IOException: Failed to create a temp directory (under
hdfs://namenode/spark_tmp/) after 10 attempts!


Thanks
Sri 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-local-dir-to-HDFS-tp27291.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: spark parquet too many small files ?

2016-07-01 Thread kali.tumm...@gmail.com
I found the jira for the issue will there be a fix in future ? or no fix ?

https://issues.apache.org/jira/browse/SPARK-6221



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-parquet-too-many-small-files-tp27264p27267.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: spark parquet too many small files ?

2016-07-01 Thread kali.tumm...@gmail.com
Hi Neelesh,

I told you in my emails it's not spark-Scala application , I am working on just 
spark SQL.

I am launching spark-SQL shell and running my hive code inside spark SQL she'll.

Spark SQL she'll accepts functions which relate to spark SQL doesn't accepts 
fictions like collasece which is spark Scala function.

What I am trying to do is below.

from(select * from source_table where load_date="2016-09-23")a
Insert overwrite table target_table Select * 


Thanks
Sri

Sent from my iPhone

> On 1 Jul 2016, at 17:35, nsalian [via Apache Spark User List] 
>  wrote:
> 
> Hi Sri, 
> 
> Thanks for the question. 
> You can simply start by doing this in the initial stage: 
> 
> val sqlContext = new SQLContext(sc) 
> val customerList = sqlContext.read.json(args(0)).coalesce(20) //using a json 
> example here 
> 
> where the argument is the path to the file(s). This will reduce the 
> partitions. 
> You can proceed with repartitioning the data further on. The goal would be to 
> reduce the number of files in the end as you do a saveAsParquet. 
> 
> Hope that helps.
> Neelesh S. Salian 
> Cloudera
> 
> 
> If you reply to this email, your message will be added to the discussion 
> below:
> http://apache-spark-user-list.1001560.n3.nabble.com/spark-parquet-too-many-small-files-tp27264p27265.html
> To unsubscribe from spark parquet too many small files ?, click here.
> NAML




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-parquet-too-many-small-files-tp27264p27266.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

spark parquet too many small files ?

2016-07-01 Thread kali.tumm...@gmail.com
Hi All, 

I am running hive in spark-sql in yarn client mode, the sql is pretty simple
load dynamic partitions to target parquet table.

I used hive configurations parameters such as  (set
hive.merge.smallfiles.avgsize=25600;set
hive.merge.size.per.task=256000;) which usually merges small files to
256mb block size these parameters are supported in spark-sql is there other
way around to merge number of small parquet files to large one.

if its a scala application I can use collasece() function or repartition but
here we are not using spark-scala application its just plain spark-sql.


Thanks
Sri 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-parquet-too-many-small-files-tp27264.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Best way to merge final output part files created by Spark job

2016-07-01 Thread kali.tumm...@gmail.com
Try using collasece function to repartition to desired number of partitions
files, to merge already output files use hive and insert overwrite table
using below options.

set hive.merge.smallfiles.avgsize=256;
set hive.merge.size.per.task=256;
set 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Best-way-to-merge-final-output-part-files-created-by-Spark-job-tp24681p27263.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: output part files max size

2016-07-01 Thread kali.tumm...@gmail.com
I am not sure but you can use collasece function to reduce number of output
files .

Thanks
Sri



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/output-part-files-max-size-tp17013p27262.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark partition formula on standalone mode?

2016-06-27 Thread kali.tumm...@gmail.com
Hi All,

I did worked on spark installed on Hadoop cluster but never worked on spark
on standalone cluster.

My question how to set number of partitions in spark when it's running on
spark standalone cluster?

If spark on Hadoop I calculate my formula using hdfs block sizes but how I
calculate without hdfs block size if spark running on standalone non Hadoop
cluster.
 
Partition formula for 100gb file:-
Hdfs block size:-256

100*1024  =400 partitions
/256

Executors:- 100/4= 25

Executor memory:- 160gb/25=7







--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-partition-formula-on-standalone-mode-tp27237.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



spark sql broadcast join ?

2016-06-16 Thread kali.tumm...@gmail.com
Hi All, 

I had used broadcast join in spark-scala applications I did used partitionby
(Hash Partitioner) and then persit for wide dependencies, present project
which I am working on pretty much Hive migration to spark-sql which is
pretty much sql to be honest no scala or python apps.

My question how to achieve broadcast join in plain spark-sql ? at the moment
join between two talbes is taking ages.

Thanks
Sri 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-broadcast-join-tp27184.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Saprk 1.6 Driver Memory Issue

2016-06-01 Thread kali.tumm...@gmail.com
Hi , 

I am using spark-sql shell wile launching I am running it as spark-sql
--conf spark.driver.maxResultSize=20g


I tried using spark-sql --conf "spark.driver.maxResults"="20g" but still no
luck do I need to use set command something like 


spark-sql --conf set "spark.driver.maxReults"="20g"

is it --conf or -conf ?

Thanks







--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Saprk-1-6-Driver-Memory-Issue-tp27063p27066.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Saprk 1.6 Driver Memory Issue

2016-06-01 Thread kali.tumm...@gmail.com
Hi All , 

I am getting spark driver memory issue even after overriding the conf by
using --conf spark.driver.maxResultSize=20g and I also mentioned in my sql
script (set spark.driver.maxResultSize =16;) but still the same error
happening.

Job aborted due to stage failure: Total size of serialized results of 79
tasks (1035.2 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)

Any thoughts ?

Thanks
Sri 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Saprk-1-6-Driver-Memory-Issue-tp27063.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



set spark 1.6 with Hive 0.14 ?

2016-05-20 Thread kali.tumm...@gmail.com
Hi All , 

Is there a way to ask spark and spark-sql to use Hive 0.14 version instead
of inbuilt hive 1.2.1.

I am testing spark-sql locally by downloading spark 1.6 from internet , I
want to execute my hive queries in spark sql using hive version 0.14 can I
go back to previous version just for a simple test.

Please share out the steps involved.


Thanks
Sri



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/set-spark-1-6-with-Hive-0-14-tp26989.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



how to run latest version of spark in old version of spark in cloudera cluster ?

2016-01-27 Thread kali.tumm...@gmail.com
Hi All, 

Just realized cloudera version of spark on my cluster is 1.2, the jar which
I built using maven is version 1.6 which is causing issue.

Is there a way to run spark version 1.6 in 1.2 version of spark ?

Thanks
Sri 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/how-to-run-latest-version-of-spark-in-old-version-of-spark-in-cloudera-cluster-tp26087.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



org.netezza.error.NzSQLException: ERROR: Invalid datatype - TEXT

2016-01-26 Thread kali.tumm...@gmail.com
Hi All, 

I am using Spark jdbc df to store data into Netezza , I think spark is
trying to create table using data type TEXT for string column , netezza
doesn't support data type text.

how to overwrite spark method to use VARCHAR instead of data type text ?

val
sourcedfmode=sourcedf.persist(StorageLevel.MEMORY_AND_DISK).write.mode("overwrite")
sourcedfmode.jdbc(TargetDBinfo.url,TargetDBinfo.table,targetprops)

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/org-netezza-error-NzSQLException-ERROR-Invalid-datatype-TEXT-tp26072.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: org.netezza.error.NzSQLException: ERROR: Invalid datatype - TEXT

2016-01-26 Thread kali.tumm...@gmail.com
Fixed by creating a new netezza Dialect and registered in jdbcDialects  
using JdbcDialects.registerDialect(NetezzaDialect) method
(spark/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala)


package com.citi.ocean.spark.elt

/**
 * Created by st84879 on 26/01/2016.
 */

import java.sql.{Connection, Types}
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
import org.apache.spark.sql.jdbc.JdbcDialect
import org.apache.spark.sql.types._
import org.apache.spark.sql.jdbc.JdbcType


private object NetezzaDialect extends JdbcDialect{

  override def canHandle(url: String): Boolean =
url.startsWith("jdbc:netezza")

  override def getCatalystType(
sqlType: Int, typeName: String, size: Int,
md: MetadataBuilder): Option[DataType] = {
if (sqlType == Types.BIT && typeName.equals("bit") && size != 1) {
  Some(BinaryType)
} else if (sqlType == Types.OTHER) {
  toCatalystType(typeName).filter(_ == StringType)
} else if (sqlType == Types.ARRAY && typeName.length > 1 && typeName(0)
== '_') {
  toCatalystType(typeName.drop(1)).map(ArrayType(_))
} else None
  }

  // TODO: support more type names.
  private def toCatalystType(typeName: String): Option[DataType] = typeName
match {
case "bool" => Some(BooleanType)
case "bit" => Some(BinaryType)
case "int2" => Some(ShortType)
case "int4" => Some(IntegerType)
case "int8" | "oid" => Some(LongType)
case "float4" => Some(FloatType)
case "money" | "float8" => Some(DoubleType)
case "text" | "varchar" | "char" | "cidr" | "inet" | "json" | "jsonb" |
"uuid" =>
  Some(StringType)
case "bytea" => Some(BinaryType)
case "timestamp" | "timestamptz" | "time" | "timetz" =>
Some(TimestampType)
case "date" => Some(DateType)
case "numeric" => Some(DecimalType.SYSTEM_DEFAULT)
case _ => None
  }

  override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
case StringType => Some(JdbcType("VARCHAR(1000)", Types.CHAR))
case BinaryType => Some(JdbcType("BYTEA", Types.BINARY))
case BooleanType => Some(JdbcType("BOOLEAN", Types.BOOLEAN))
case FloatType => Some(JdbcType("FLOAT4", Types.FLOAT))
case DoubleType => Some(JdbcType("FLOAT8", Types.DOUBLE))
case ArrayType(et, _) if et.isInstanceOf[AtomicType] =>
  getJDBCType(et).map(_.databaseTypeDefinition)
   
.orElse(JdbcUtils.getCommonJDBCType(et).map(_.databaseTypeDefinition))
.map(typeName => JdbcType(s"$typeName[]", java.sql.Types.ARRAY))
case ByteType => throw new IllegalArgumentException(s"Unsupported type
in netezza: $dt");
case _ => None
  }

  override def getTableExistsQuery(table: String): String = {
s"SELECT 1 FROM $table LIMIT 1"
  }

  override def beforeFetch(connection: Connection, properties: Map[String,
String]): Unit = {
super.beforeFetch(connection, properties)

// According to the postgres jdbc documentation we need to be in
autocommit=false if we actually
// want to have fetchsize be non 0 (all the rows).  This allows us to
not have to cache all the
// rows inside the driver when fetching.
//
// See:
https://jdbc.postgresql.org/documentation/head/query.html#query-with-cursor
//
if (properties.getOrElse("fetchsize", "0").toInt > 0) {
  connection.setAutoCommit(false)
}

  }

}




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/org-netezza-error-NzSQLException-ERROR-Invalid-datatype-TEXT-tp26072p26075.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark 1.6 Issue

2016-01-08 Thread kali.tumm...@gmail.com
Hi All, 

worked OK by adding below in VM options.

-Xms128m -Xmx512m -XX:MaxPermSize=300m -ea

Thanks
Sri 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-1-6-Issue-tp25893p25920.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



spark 1.6 Issue

2016-01-06 Thread kali.tumm...@gmail.com
Hi All, 

I am running my app in IntelliJ Idea (locally) my config local[*] , the code
worked ok with spark 1.5 but when I upgraded to 1.6 I am having below issue.

is this a bug in 1.6 ? I change back to 1.5 it worked ok without any error
do I need to pass executor memory while running in local in spark 1.6 ?

Exception in thread "main" java.lang.IllegalArgumentException: System memory
259522560 must be at least 4.718592E8. Please use a larger heap size.

Thanks
Sri 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-1-6-Issue-tp25893.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Apache spark certification pass percentage ?

2015-12-22 Thread kali.tumm...@gmail.com
Hi All, 

Does anyone know pass percentage for Apache spark certification exam ?

Thanks
Sri 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Apache-spark-certification-pass-percentage-tp25761.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



how to turn off spark streaming gracefully ?

2015-12-18 Thread kali.tumm...@gmail.com
Hi All, 

Imagine I have a Production spark streaming kafka (direct connection)
subscriber and publisher jobs running which publish and subscriber (receive)
data from a kafka topic and I save one day's worth of data using
dstream.slice to Cassandra daily table (so I create daily table before
running spark streaming job).

My question if all the above code runs in some scheduler like autosys how
should I say to spark publisher to stop publishing as it is End of day and
to spark subscriber to stop receiving to stop receiving without killing the
jobs ? if I kill my autosys scheduler turns red saying the job had failed
etc ...
Is there a way to stop both subscriber and publisher with out killing or
terminating the code.

Thanks
 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/how-to-turn-off-spark-streaming-gracefully-tp25734.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark data frame write.mode("append") bug

2015-12-12 Thread kali.tumm...@gmail.com
Hi All, 

https://github.com/apache/spark/blob/branch-1.5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala#L48

In Present spark version in line 48 there is a bug, to check whether table
exists in a database using limit doesnt work for all databases sql server
for example.

best way to check whehter table exists in any database is to use, select *
from table where 1=2;  or select 1 from table where 1=2; this supports all
the databases.

In spark 1.6 can this change be implemented, this lets  write.mode("append")
bug to go away.



def tableExists(conn: Connection, table: String): Boolean = {

// Somewhat hacky, but there isn't a good way to identify whether a
table exists for all
// SQL database systems, considering "table" could also include the
database name.
Try(conn.prepareStatement(s"SELECT 1 FROM $table LIMIT
1").executeQuery().next()).isSuccess
  }

Solution:-
def tableExists(conn: Connection, table: String): Boolean = {

// Somewhat hacky, but there isn't a good way to identify whether a
table exists for all
// SQL database systems, considering "table" could also include the
database name.
Try(conn.prepareStatement(s"SELECT 1 FROM $table where
1=2").executeQuery().next()).isSuccess
  }



Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-data-frame-write-mode-append-bug-tp25650p25693.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



spark data frame write.mode("append") bug

2015-12-09 Thread kali.tumm...@gmail.com
Hi Spark Contributors,

I am trying to append data  to target table using df.write.mode("append")
functionality but spark throwing up table already exists exception.

Is there a fix scheduled in later spark release ?, I am using spark 1.5.

val sourcedfmode=sourcedf.write.mode("append")
sourcedfmode.jdbc(TargetDBinfo.url,TargetDBinfo.table,targetprops)

Full Code:-
https://github.com/kali786516/ScalaDB/blob/master/src/main/java/com/kali/db/SaprkSourceToTargetBulkLoad.scala

Spring Config File:-
https://github.com/kali786516/ScalaDB/blob/master/src/main/resources/SourceToTargetBulkLoad.xml


Thanks
Sri 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-data-frame-write-mode-append-bug-tp25650.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: can i write only RDD transformation into hdfs or any other storage system

2015-12-09 Thread kali.tumm...@gmail.com
Hi Prateek,

you mean writing spark output to any storage system ? yes you can .

Thanks
Sri 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/can-i-write-only-RDD-transformation-into-hdfs-or-any-other-storage-system-tp25637p25651.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Release data for spark 1.6?

2015-12-09 Thread kali.tumm...@gmail.com
Hi All, 

does anyone know exact release data for spark 1.6 ?

Thanks
Sri



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Release-data-for-spark-1-6-tp25654.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Exception in Spark-sql insertIntoJDBC command

2015-12-08 Thread kali.tumm...@gmail.com
Hi All, 

I have the same error in spark 1.5 is there any solution to get around with
this ? I also tried using sourcedf.write.mode("append") but still no luck .

val sourcedfmode=sourcedf.write.mode("append")
sourcedfmode.jdbc(TargetDBinfo.url,TargetDBinfo.table,targetprops)

Thanks
Sri 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Exception-in-Spark-sql-insertIntoJDBC-command-tp24655p25640.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



spark sql current time stamp function ?

2015-12-07 Thread kali.tumm...@gmail.com
Hi All, 

Is there a spark sql function which returns current time stamp 

Example:- 
In Impala:- select NOW();
SQL Server:- select GETDATE();
Netezza:- select NOW();

Thanks
Sri



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-current-time-stamp-function-tp25620.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark sql current time stamp function ?

2015-12-07 Thread kali.tumm...@gmail.com
I found a way out.

import java.text.SimpleDateFormat
import java.util.Date;

val format = new SimpleDateFormat("-M-dd hh:mm:ss")

 val testsql=sqlContext.sql("select column1,column2,column3,column4,column5
,'%s' as TIME_STAMP from TestTable limit 10".format(format.format(new
Date(


Thanks
Sri 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-current-time-stamp-function-tp25620p25621.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark sql random number or sequence numbers ?

2015-12-07 Thread kali.tumm...@gmail.com
Hi All,

I did implemented random_numbers using scala spark , is there a function to
get row_number equivalent in spark sql ? 

example:- 
sql server:-row_number() 
Netezza:- sequence number 
mysql:- sequence number

Example:-
val testsql=sqlContext.sql("select column1,column2,column3,column4,column5
,row_number() as random from TestTable limit 10")

Thanks
Sri 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-sql-random-number-or-sequence-numbers-tp25623.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark sql data frames do they run in parallel by default?

2015-12-06 Thread kali.tumm...@gmail.com
Hi all, 

I wrote below spark code to extract data from SQL server using spark
SQLContext.read.format with several different options , question does by
default sqlContext.read load function run in parallel does it use all the
available cores available ? when I am saving the output to a file it is
getting saved as  one file does it mean the code used single core in my
machine ? 

My Code:-

package com.kali.db

/**
 * Created by kalit_000 on 06/12/2015.
 */
import org.apache.spark.SparkConf
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark._
import org.apache.spark.rdd.{JdbcRDD, RDD}
import org.apache.spark.sql.DataFrame
import org.springframework.context.support.ClassPathXmlApplicationContext

case class
SparkSqlValueClass(driver:String,url:String,username:String,password:String,sql:String,table:String,opdelimeter:String)

object SparkSqlSelectSpring {

   def main (args: Array[String]) {
 Logger.getLogger("org").setLevel(Level.WARN)
 Logger.getLogger("akka").setLevel(Level.WARN)

 val conf = new
SparkConf().setMaster("local[*]").setAppName("SparkSqlConfigurable").set("spark.hadoop.validateOutputSpecs",
"false")
 val sc = new SparkContext(conf)

 def opfile(value:DataFrame,delimeter:String):RDD[String]=
 {
   value.map(x =>
x.toString.replace("[","").replace("]","").replace(",",delimeter))
 }


 //read the application context file
 val ctx = new ClassPathXmlApplicationContext("sparksql.xml")
 val DBinfo =
ctx.getBean("SparkSQLInst").asInstanceOf[SparkSqlValueClass]

 val driver = DBinfo.driver
 val url = DBinfo.url
 val username = DBinfo.username
 val password = DBinfo.password
 val query = DBinfo.sql
 val sqlquery = DBinfo.sql
 val table = DBinfo.table
 val opdelimeter=DBinfo.opdelimeter

 println("DB Driver:-%s".format(driver))
 println("DB Url:-%s".format(url))
 println("Username:-%s".format(username))
 println("Password:-%s".format(password))
 println("Query:-%s".format(query))
 println("Table:-%s".format(table))
 println("Opdelimeter:-%s".format(opdelimeter))

 try {
 val sqlContext = new org.apache.spark.sql.SQLContext(sc)
 val df = sqlContext.read.format("jdbc").options(Map("url" ->
url,"dbtable" -> table,"driver" -> driver)).load()
 df.registerTempTable(table)
 val OP=sqlContext.sql(query)


opfile(OP,opdelimeter).saveAsTextFile("C:\\Users\\kalit_000\\Desktop\\typesafe\\scaladbop\\op.txt")

 } catch {
   case e: Exception => e.printStackTrace
 }
 sc.stop()

  }

}

Spring Bean:-


http://www.springframework.org/dtd/spring-beans.dtd;>















Thanks
Sri




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-sql-data-frames-do-they-run-in-parallel-by-default-tp25604.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark sql data frames do they run in parallel by default?

2015-12-06 Thread kali.tumm...@gmail.com
Hi All, 

I re wrote my code to use sqlContext.read.jdbc which lets me specify
upperbound,lowerbound,numberofparitions etc .. which might run in parallel,
I need to try on a cluster which I will do when I have time.
But please confirm read.jdbc does parallel reads ?

Spark code:-


package com.kali.db

/**
 * Created by kalit_000 on 06/12/2015.
 */

import java.util.Properties
import org.apache.spark.SparkConf
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark._
import org.apache.spark.rdd.{JdbcRDD, RDD}
import org.apache.spark.sql.DataFrame
import org.springframework.context.support.ClassPathXmlApplicationContext

case class
SparkSqlValueClassMPP(driver:String,url:String,username:String,password:String,sql:String,table:String,opdelimeter:String,lowerbound:String,upperbound:String,numberofparitions:String,parallelizecolumn:String)

object SparkDBExtractorMPP {

  def main (args: Array[String]) {

Logger.getLogger("org").setLevel(Level.WARN)
Logger.getLogger("akka").setLevel(Level.WARN)

val conf = new
SparkConf().setMaster("local[*]").setAppName("SparkDBExtractorMPP").set("spark.hadoop.validateOutputSpecs",
"false")
val sc = new SparkContext(conf)

def opfile(value:DataFrame,delimeter:String):RDD[String]=
{
  value.map(x =>
x.toString.replace("[","").replace("]","").replace(",",delimeter))
}

//read the application context file
val ctx = new ClassPathXmlApplicationContext("sparkDBExtractorMpp.xml")
val DBinfo =
ctx.getBean("SparkSQLDBExtractorMPP").asInstanceOf[SparkSqlValueClassMPP]

val driver = DBinfo.driver
val url = DBinfo.url
val username = DBinfo.username
val password = DBinfo.password
val query = DBinfo.sql
val sqlquery = DBinfo.sql
val table = DBinfo.table
val opdelimeter=DBinfo.opdelimeter
val lowerbound=DBinfo.lowerbound.toInt
val upperbound=DBinfo.upperbound.toInt
val numberofpartitions=DBinfo.numberofparitions.toInt
val parallelizecolumn=DBinfo.parallelizecolumn


println("DB Driver:-%s".format(driver))
println("DB Url:-%s".format(url))
println("Username:-%s".format(username))
println("Password:-%s".format(password))
println("Query:-%s".format(query))
println("Table:-%s".format(table))
println("Opdelimeter:-%s".format(opdelimeter))
println("Lowerbound:-%s".format(lowerbound))
println("Upperbound:-%s".format(upperbound))
println("Numberofpartitions:-%s".format(numberofpartitions))
println("Parallelizecolumn:-%s".format(parallelizecolumn))

try {
val props=new Properties()
props.put("user",username)
props.put("password",password)
props.put("driver",driver)

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df =
sqlContext.read.jdbc(url,table,parallelizecolumn,lowerbound,upperbound,numberofpartitions,props)

df.show(10)

df.registerTempTable(table)
val OP=sqlContext.sql(query)

   
opfile(OP,opdelimeter).saveAsTextFile("C:\\Users\\kalit_000\\Desktop\\typesafe\\scaladbop\\op.txt")

} catch {
  case e: Exception => e.printStackTrace
}
sc.stop()
  }
}

Spring:-

http://www.springframework.org/dtd/spring-beans.dtd;>
























--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-sql-data-frames-do-they-run-in-parallel-by-default-tp25604p25611.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Does spark streaming write ahead log writes all received data to HDFS ?

2015-11-20 Thread kali.tumm...@gmail.com
Hi All,

If write ahead logs are enabled in spark streaming does all the received
data gets written to HDFS path   ? or it only writes the metadata.
How does clean up works , does HDFS path gets bigger and bigger  up everyday
do I need to write an clean up job to delete data from  write ahead logs
folder ?
what actually does write ahead log folder has ?

Thanks
Sri 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Does-spark-streaming-write-ahead-log-writes-all-received-data-to-HDFS-tp25439.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



spark inner join

2015-10-24 Thread kali.tumm...@gmail.com
Hi All, 

In sql say for example I have table1 (moveid) and table2 (movieid,moviename)
in sql we write something like select moviename ,movieid,count(1) from
table2 inner join table table1 on table1.movieid=table2.moveid group by 
, here in sql table1 has only one column where as table 2 has two columns
still the join works , same way in spark can join on keys from both the
rdd's ? –

when I tried to join two rdd in spark both the rdd's should have number of
elements for that I need to add a dummy value 0 for example is there other
way around or am I doing completely wrong ?

val
lines=sc.textFile("C:\\Users\\kalit_000\\Desktop\\udemy_spark\\ml-100k\\u.data")
val
movienamesfile=sc.textFile("C:\\Users\\kalit_000\\Desktop\\udemy_spark\\ml-100k\\u.item")
val moviesid=lines.map(x => x.split("\t")).map(x => (x(1),0))
val test=moviesid.map(x => x._1)
val movienames=movienamesfile.map(x => x.split("\\|")).map(x =>
(x(0),x(1)))
val movienamejoined=moviesid.join(movienames).distinct()

Thanks
Sri



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-inner-join-tp25193.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Saprk error:- Not a valid DFS File name

2015-10-23 Thread kali.tumm...@gmail.com
Hi All, 

got this weird error when I tried to run spark on YARN-CLUSTER mode , I have
33 files and I am looping spark in bash one by one most of them worked ok
except few files.

Is this below error HDFS or spark error ? 

Exception in thread "Driver" java.lang.IllegalArgumentException: Pathname
/user/myid/-u/12:51/_temporary/0 from
hdfs://dev/user/myid/-u/12:51/_temporary/0 is not a valid DFS filename.

File Name which I passed to spark , does file name causes issue ?

hdfs://dev/data/20151019/sipmktdata.ColorDataArchive.UTD.P4_M-P.v5.2015-09-18.txt.20150918

Thanks
Sri 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Saprk-error-Not-a-valid-DFS-File-name-tp25186.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Saprk error:- Not a valid DFS File name

2015-10-23 Thread kali.tumm...@gmail.com
Full Error:-
at
org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:195)
at
org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:104)
at
org.apache.hadoop.hdfs.DistributedFileSystem$16.doCall(DistributedFileSystem.java:831)
at
org.apache.hadoop.hdfs.DistributedFileSystem$16.doCall(DistributedFileSystem.java:827)
at
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at
org.apache.hadoop.hdfs.DistributedFileSystem.mkdirsInternal(DistributedFileSystem.java:827)
at
org.apache.hadoop.hdfs.DistributedFileSystem.mkdirs(DistributedFileSystem.java:820)
at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1817)
at
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:305)
at
org.apache.hadoop.mapred.FileOutputCommitter.setupJob(FileOutputCommitter.java:131)
at 
org.apache.spark.SparkHadoopWriter.preSetup(SparkHadoopWriter.scala:64)
at
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1046)
at
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:941)
at
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:850)
at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1164)
at
com.citi.ocean.spark.SimpleMktDataFlow$.main(SimpleMktDataFlow.scala:106)
at com.citi.ocean.spark.SimpleMktDataFlow.main(SimpleMktDataFlow.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:427)




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Saprk-error-Not-a-valid-DFS-File-name-tp25186p25188.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



difference between rdd.collect().toMap to rdd.collectAsMap() ?

2015-10-20 Thread kali.tumm...@gmail.com
Hi All, 

Is there any performance impact when I use collectAsMap on my RDD instead of
rdd.collect().toMap ?

I have a key value rdd and I want to convert to HashMap as far I know
collect() is not efficient on large data sets as it runs on driver can I use
collectAsMap instead is there any performance impact ?

Original:-
 val QuoteHashMap=QuoteRDD.collect().toMap
 val QuoteRDDData=QuoteHashMap.values.toSeq
 val QuoteRDDSet=sc.parallelize(QuoteRDDData.map(x =>
x.toString.replace("(","").replace(")","")))
 QuoteRDDSet.saveAsTextFile(Quotepath)

Change:-
 val QuoteHashMap=QuoteRDD.collectAsMap()
 val QuoteRDDData=QuoteHashMap.values.toSeq
 val QuoteRDDSet=sc.parallelize(QuoteRDDData.map(x =>
x.toString.replace("(","").replace(")","")))
 QuoteRDDSet.saveAsTextFile(Quotepath)



Thanks
Sri 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/difference-between-rdd-collect-toMap-to-rdd-collectAsMap-tp25139.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Pass spark partition explicitly ?

2015-10-18 Thread kali.tumm...@gmail.com
Hi All, 

can I pass number of partitions to all the RDD explicitly while submitting
the spark Job or di=o I need to mention in my spark code itself ?

Thanks
Sri 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Pass-spark-partition-explicitly-tp25113.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



can I use Spark as alternative for gem fire cache ?

2015-10-17 Thread kali.tumm...@gmail.com
Hi All, 

Can spark be used as an alternative to gem fire cache ? we use gem fire
cache to save (cache) dimension data in memory which is later used by our
Java custom made ETL tool can I do something like below ?

can I cache a RDD in memory for a whole day ? as of I know RDD will get
empty once the spark code finish executing (correct me if I am wrong).

Spark:- 
create a RDD 
rdd.persistance 

Thanks






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/can-I-use-Spark-as-alternative-for-gem-fire-cache-tp25106.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Output println info in LogMessage Info ?

2015-10-17 Thread kali.tumm...@gmail.com
Hi All, 

I n Unix I can print some warning or info using LogMessage WARN "Hi All" or
LogMessage INFO "Hello World" is there similar thing in Spark ?

Imagine I wan to print count of RDD in Logs instead of using Println

Thanks
Sri 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Output-println-info-in-LogMessage-Info-tp25107.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Is there any better way of writing this code

2015-10-12 Thread kali.tumm...@gmail.com
Hi All, 

just wonderign is there any better way of writing this below code, I am new
to spark an I feel what I wrote is pretty simple and basic and straight
forward is there any better way of writing using functional paradigm.

   val QuoteRDD=quotefile.map(x => x.split("\\|")).
  filter(line => line(0).contains("1017")).
  map(x => ((x(5)+x(4)) , (x(5),x(4),x(1) ,
  if (x(15) =="B")
(
  {if (x(25) == "") x(9)   else x(25)},
  {if (x(37) == "") x(11)  else x(37)}
  )
  else if (x(15) =="C" )
(
  {if (x(24) == "") (x(9))  else x(24)},
  {if (x(30) == "") (x(11)) else x(30)}
  )
  else if (x(15) =="A")
  {(x(9),x(11))}
  )))

Thanks
Sri 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-any-better-way-of-writing-this-code-tp25027.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Create hashmap using two RDD's

2015-10-10 Thread kali.tumm...@gmail.com
Got it ..., created hashmap and saved it to file please follow below steps ..

 val QuoteRDD=quotefile.map(x => x.split("\\|")).
  filter(line => line(0).contains("1017")).
  map(x => ((x(5)+x(4)) , (x(5),x(4),x(1) ,
  if (x(15) =="B")
(
  {if (x(25) == "") x(9)  else x(25)},
 {if (x(37) == "") x(11) else x(37)}
)
  else if (x(15) =="C" )
(
   {if (x(24) == "") (x(9))  else x(24)},
   {if (x(30) == "") (x(11)) else x(30)}
)
  else if (x(15) =="A")
 {(x(9),x(11))}
)))


val QuoteHashMap=QuoteRDD.collect().toMap
val test=QuoteHashMap.values.toSeq
val test2=sc.parallelize(test.map(x =>
x.toString.replace("(","").replace(")","")))
test2.saveAsTextFile("C:\\Users\\kalit_000\\Desktop\\mkdata\\test.txt")
test2.collect().foreach(println)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Create-hashmap-using-two-RDD-s-tp24996p25014.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Create hashmap using two RDD's

2015-10-10 Thread kali.tumm...@gmail.com
Hi All, 

I changed my way of approach now I am bale to load data into MAP and get
data out using get command.

val QuoteRDD=quotefile.map(x => x.split("\\|")).
  filter(line => line(0).contains("1017")).
  map(x => ((x(5)+x(4)) , (x(5),x(4),x(1) ,
if (x(15) =="B") 
  if (x(25) =="") x(9)  else (x(25)),
if (x(37) =="") x(11) else (x(37)),
if (x(15) =="C") 
  if (x(24) =="") x(9)  else (x(24)),
if (x(30) =="") x(11) else (x(30)),
if (x(15) =="A") 
  x(9),
x(11)
  )))

val QuoteHashMap=QuoteRDD.collect().toMap
QuoteHashMap.get("CPHI08173").foreach(println)

Problem now is how to save value data from the hashmap to a file ? I need to
iterate the keys in hash and save it to a file

for ( ((k,v)) <-QuoteHashMap) QuoteHashMap.get(k)

Thanks
Sri 







--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Create-hashmap-using-two-RDD-s-tp24996p25008.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Create hashmap using two RDD's

2015-10-09 Thread kali.tumm...@gmail.com
Hi all, 

I am trying to create a hashmap using two rdd, but having issues key not
found 
do I need to convert RDD to list first ?

1) rdd has key data
2) rdd has value data

Key Rdd:-
val quotekey=file.map(x => x.split("\\|")).filter(line =>
line(0).contains("1017")).map(x => x(5)+x(4))

Value Rdd:-
val QuoteRDD=quotefile.map(x => x.split("\\|")).
 filter(line => line(0).contains("1017")).
 map(x =>
(x(5).toString+x(4).toString,x(5).toString,x(4).toString,x(1).toString ,
 if (x(15).toString =="B") 
   if (x(25).toString =="") x(9).toString  else
(x(25).toString),
   if (x(37).toString =="") x(11).toString else
(x(37).toString),
if (x(15).toString =="C") 
   if (x(24).toString =="") x(9).toString  else
(x(24).toString),
   if (x(30).toString =="") x(11).toString else
(x(30).toString),
 if (x(15).toString =="A") 
   x(9).toString,
   x(11).toString
))

Hash Map:-
val quotehash = new HashMap[String,String]
quotehash + quotekey.toString() -> QuoteRDD
quotehash("CPHI08172")

Error:-
key not found: CPHI08172

Thanks








--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Create-hashmap-using-two-RDD-s-tp24996.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: KafkaProducer using Cassandra as source

2015-09-23 Thread kali.tumm...@gmail.com
Guys sorry I figured it out.

val
x=p.collect().mkString("\n").replace("[","").replace("]","").replace(",","~")

Full Code:-

package com.examples

/**
 * Created by kalit_000 on 22/09/2015.
 */

import kafka.producer.KeyedMessage
import kafka.producer.Producer
import kafka.producer.ProducerConfig
import java.util.Properties
import _root_.kafka.serializer.StringDecoder
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkConf
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.streaming._
import org.apache.spark.streaming.{Seconds,StreamingContext}
import org.apache.spark._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka.KafkaUtils

object SparkProducerDBCassandra {

  case class TestTable (TRADE_ID:String,TRADE_PRICE: String)

  def main(args: Array[String]): Unit =
  {
Logger.getLogger("org").setLevel(Level.WARN)
Logger.getLogger("akka").setLevel(Level.WARN)

val conf = new
SparkConf().setMaster("local[2]").setAppName("testkali2").set("spark.cassandra.connection.host",
"127.0.0.1")
val sc=new SparkContext("local","test",conf)
//val ssc= new StreamingContext(sc,Seconds(2))

print("Test kali Spark Cassandra")

val cc = new org.apache.spark.sql.cassandra.CassandraSQLContext(sc)

val p=cc.sql("select * from people.person")

p.collect().foreach(println)

val props:Properties = new Properties()
props.put("metadata.broker.list", "localhost:9092")
props.put("serializer.class", "kafka.serializer.StringEncoder")

val config= new ProducerConfig(props)
val producer= new Producer[String,String](config)

val
x=p.collect().mkString("\n").replace("[","").replace("]","").replace(",","~")

   producer.send(new KeyedMessage[String, String]("trade", x))

//p.collect().foreach(print)

//ssc.start()

//ssc.awaitTermination()

  }
}



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/KafkaProducer-using-Cassandra-as-source-tp24774p24788.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Kafka createDirectStream ​issue

2015-09-19 Thread kali.tumm...@gmail.com
Hi ,

I am trying to develop in intellij Idea same code I am having the same issue
is there any work around.

Error in intellij:- cannot resolve symbol createDirectStream

import kafka.serializer.StringDecoder
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkConf
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.{Seconds,StreamingContext}
import org.apache.spark._
import  org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka.KafkaUtils
import  org.apache.spark.streaming.kafka._
import com.datastax.spark.connector.streaming._
import org.apache.spark.streaming.kafka._

object SparkKafkaOffsetTest {

  def main(args: Array[String]): Unit = {
//Logger.getLogger("org").setLevel(Level.WARN)
//Logger.getLogger("akka").setLevel(Level.WARN)

val conf = new
SparkConf().setMaster("local").setAppName("KafkaOffsetStreaming").set("spark.executor.memory",
"1g")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(2))

val zkQuorm="localhost:2181"
val group="test-group"
val topics="first"
val numThreads=1
val broker="localhost:9091"
val kafkaParams = Map[String, String]("metadata.broker.list" -> broker)
//val kafkaParams = Map[String, String]("metadata.broker.list" )
val topicMap=topics.split(",").map((_,numThreads.toInt)).toMap

//val lineMap=KafkaUtils.createStream(ssc,zkQuorm,group,topicMap)

//val directKafkaStream=KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams, topics)

val messages= KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams, topics)



//val directKafkaStream = KafkaUtils.createDirectStream[
  //[key class], [value class], [key decoder class], [value decoder
class] ](
  //streamingContext, [map of Kafka parameters], [set of topics to
consume])

  }

}

Thanks
Sri




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-createDirectStream-issue-tp23456p24749.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Unable to see my kafka spark streaming output

2015-09-19 Thread kali.tumm...@gmail.com
Hi All,

figured it out for got mention local as loca[2] , at least two node
required.

package com.examples

/**
 * Created by kalit_000 on 19/09/2015.
 */

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkConf
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.streaming.{Seconds,StreamingContext}
import org.apache.spark._
import  org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka.KafkaUtils


object SparkStreamingKafka {

  def main(args: Array[String]): Unit =
  {
Logger.getLogger("org").setLevel(Level.WARN)
Logger.getLogger("akka").setLevel(Level.WARN)

val conf = new
SparkConf().setMaster("local[2]").setAppName("KafkaStreaming").set("spark.executor.memory",
"1g")
val sc=new SparkContext(conf)
val ssc= new StreamingContext(sc,Seconds(2))

val zkQuorm="localhost:2181"
val group="test-group"
val topics="first"
val numThreads=1

val topicMap=topics.split(",").map((_,numThreads.toInt)).toMap

val lineMap=KafkaUtils.createStream(ssc,zkQuorm,group,topicMap)

val lines=lineMap.map(_._2)

lines.print

//lines.print()

//val words=lines.flatMap(_.split(" "))

   // val pair=words.map( x => (x,1))

//val
wordcount=pair.reduceByKeyAndWindow(_+_,_-_,Minutes(1),Seconds(2),2)

//wordcount.print

//ssc.checkpoint("hdfs://localhost:9000/user/hduser/checkpoint")

ssc.checkpoint("C:\\scalatutorials\\sparkstreaming_checkpoint_folder")

//C:\scalatutorials\sparkstreaming_checkpoint_folder

ssc.start()

ssc.awaitTermination()





  }

}
Thanks
Sri 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Unable-to-see-my-kafka-spark-streaming-output-tp24750p24751.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Unable to see my kafka spark streaming output

2015-09-19 Thread kali.tumm...@gmail.com
Hi All,
I am unable to see the output getting printed in the console can anyone
help.

package com.examples

/**
 * Created by kalit_000 on 19/09/2015.
 */

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkConf
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.streaming.{Seconds,StreamingContext}
import org.apache.spark._
import  org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka.KafkaUtils


object SparkStreamingKafka {

  def main(args: Array[String]): Unit =
  {
Logger.getLogger("org").setLevel(Level.WARN)
Logger.getLogger("akka").setLevel(Level.WARN)

val conf = new
SparkConf().setMaster("local").setAppName("KafkaStreaming").set("spark.executor.memory",
"1g")
val sc=new SparkContext(conf)
val ssc= new StreamingContext(sc,Seconds(2))

val zkQuorm="localhost:2181"
val group="test-group"
val topics="first"
val numThreads=1

val topicMap=topics.split(",").map((_,numThreads.toInt)).toMap

val lineMap=KafkaUtils.createStream(ssc,zkQuorm,group,topicMap)

val lines=lineMap.map(_._2)

//lines.print()

val words=lines.flatMap(_.split(" "))

val pair=words.map( x => (x,1))

val wordcount=pair.reduceByKeyAndWindow(_+_,_-_,Minutes(1),Seconds(2),2)

wordcount.print

//ssc.checkpoint("hdfs://localhost:9000/user/hduser/checkpoint")

ssc.checkpoint("C:\\scalatutorials\\sparkstreaming_checkpoint_folder")

//C:\scalatutorials\sparkstreaming_checkpoint_folder

ssc.start()

ssc.awaitTermination()





  }

}


Thanks
Sri 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Unable-to-see-my-kafka-spark-streaming-output-tp24750.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



word count (group by users) in spark

2015-09-19 Thread kali.tumm...@gmail.com
Hi All, 
I would like to achieve this below output using spark , I managed to write
in Hive and call it in spark but not in just spark (scala), how to group
word counts on particular user (column) for example.
Imagine users and their given tweets I want to do word count based on user
name.

Input:-
kaliA,B,A,B,B
james B,A,A,A,B

Output:-
kali A [Count] B [Count]
James A [Count] B [Count]

My Hive Answer:-
CREATE EXTERNAL TABLE  TEST
(
 user_name string   ,
 COMMENTS  STRING 
   
)  ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001'  STORED AS TEXTFILE
LOCATION '/data/kali/test';    HDFS FOLDER (create hdfs folder and
create a text file with data mentioned in the email)

use default;select user_name,COLLECT_SET(text) from (select
user_name,concat(sub,' ',count(comments)) as text  from test LATERAL VIEW
explode(split(comments,',')) subView AS sub group by user_name,sub)w group
by user_name;

Spark With Hive:-
package com.examples

/**
 * Created by kalit_000 on 17/09/2015.
 */
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.SparkContext._


object HiveWordCount {

  def main(args: Array[String]): Unit =
  {
Logger.getLogger("org").setLevel(Level.WARN)
Logger.getLogger("akka").setLevel(Level.WARN)

val conf = new
SparkConf().setMaster("local").setAppName("HiveWordCount").set("spark.executor.memory",
"1g")
val sc = new SparkContext(conf)
val sqlContext= new SQLContext(sc)

val hc=new HiveContext(sc)

hc.sql("CREATE EXTERNAL TABLE IF NOT EXISTS default.TEST  (user_name
string ,COMMENTS STRING )ROW FORMAT DELIMITED FIELDS TERMINATED BY '001' 
STORED AS TEXTFILE LOCATION '/data/kali/test' ")

val op=hc.sql("select user_name,COLLECT_SET(text) from (select
user_name,concat(sub,' ',count(comments)) as text  from default.test LATERAL
VIEW explode(split(comments,',')) subView AS sub group by user_name,sub)w
group by user_name")

op.collect.foreach(println)


  }




Thanks




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/word-count-group-by-users-in-spark-tp24748.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



split function on spark sql created rdd

2015-05-23 Thread kali.tumm...@gmail.com
Hi All, 

I am trying to do word count on number of tweets, my first step is to get
data from table using spark sql and then run split function on top of it to
calculate word count.

Error:- valuse split is not a member of org.apache.spark.sql.SchemaRdd

Spark Code that doesn't work to do word count:-

val disitnct_tweets=hiveCtx.sql(select distinct(text) from tweets_table
where text  '')
val distinct_tweets_List=sc.parallelize(List(distinct_tweets))
//tried split on both the rdd disnt worked
distinct_tweets.flatmap(line = line.split( )).map(word =
(word,1)).reduceByKey(_+_)
distinct_tweets_List.flatmap(line = line.split( )).map(word =
(word,1)).reduceByKey(_+_)

But when I output the data from sparksql to a file and load it again and run
split it works.

Example Code that works:-

val distinct_tweets=hiveCtx.sql(select dsitinct(text) from tweets_table
where text  '')
val distinct_tweets_op=distinct_tweets.collect()
val rdd=sc.parallelize(distinct_tweets_op)
rdd.saveAsTextFile(/home/cloudera/bdp/op)
val textFile=sc.textFile(/home/cloudera/bdp/op/part-0)
val counts=textFile.flatMap(line = line.split( )).map(word =
(word,1)).reduceByKey(_+_)
counts.SaveAsTextFile(/home/cloudera/bdp/wordcount)

I don't want to write to file instead want to collect in a rdd and apply
filter function on top of schema rdd, is there a way.

Thanks 







--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/split-function-on-spark-sql-created-rdd-tp23001.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org