spark + SalesForce SSL HandShake Issue
Hi All, I was trying out spark + SalesforceLibabry on cloudera 5.9 I am having SSL handhshake issue please check out my question on stack over flow no one answered. The library works ok on windows it fails when I try to run on cloudera edge node. https://stackoverflow.com/questions/47820372/ssl-handshake-issue-on-cloudera-5-9 Did anyone tried out these spark + salesforce library ( https://github.com/springml/spark-salesforce) + (https://github.com/springml/salesforce-wave-api). I am trying to use sales force wave api library (https://github.com/springml/salesforce-wave-api) in cloudera cluster 5.9 to get data, I have to use proxy because from our cluster its the only way to communicate outside world. So I made changes to the library to take proxy host and port to communicate below are the place where I made changes. Change 1:- config.setProxy("myenterpiseproxy server",port); https://github.com/springml/salesforce-wave-api/blob/0ac76aeb2221d9e7038229fd352a8694e8cde7e9/src/main/java/com/springml/salesforce/wave/util/SFConfig.java#L101 Change 2:- HttpHost proxy = new HttpHost("myenterpiseproxy server", port, "http"); https://github.com/springml/salesforce-wave-api/blob/0ac76aeb2221d9e7038229fd352a8694e8cde7e9/src/main/java/com/springml/salesforce/wave/util/HTTPHelper.java#L127 Change 3:- RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(timeout) .setConnectTimeout(timeout).setConnectionRequestTimeout(timeout).setProxy(proxy).build(); https://github.com/springml/salesforce-wave-api/blob/0ac76aeb2221d9e7038229fd352a8694e8cde7e9/src/main/java/com/springml/salesforce/wave/util/HTTPHelper.java#L129 I built an application to use salesforce wave api as dependency and I tried to execute the Jar I am getting SSL handshake issue. I passed in javax.net.ssl.truststore,javax.net.ssl.keyStore,https.protocols of my cluster still having problems. Did anyone had similar issue ? did anyone tried to use this library in cloudera cluster ? Run Book:- java -cp httpclient-4.5.jar:SFWaveApiTest-1.0-SNAPSHOT-jar-with-dependencies.jar com.az.sfget.SFGetTest "username" "passwordwithtoken" "https://test.salesforce.com/services/Soap/u/35; "select id,OWNERID from someobject" "enterpiseproxyhost" "9400" "TLSv1.1,TLSv1.2" "/usr/java/jdk1.7.0_67-cloudera/jre/lib/security/jssecacerts" "/opt/cloudera/security/jks/uscvlpcldra-keystore.jks" Error:- javax.net.ssl.SSLHandshakeException: Remote host closed connection during handshake at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:946) at sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1312) at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1339) at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1323) at org.apache.http.conn.ssl.SSLConnectionSocketFactory.createLayeredSocket(SSLConnectionSocketFactory.java:394) at org.apache.http.conn.ssl.SSLConnectionSocketFactory.connectSocket(SSLConnectionSocketFactory.java:353) at org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:134) at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:353) at org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:388) at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236) at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:184) at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:88) at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110) at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:184) at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82) at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:107) at com.springml.salesforce.wave.util.HTTPHelper.execute(HTTPHelper.java:122) at com.springml.salesforce.wave.util.HTTPHelper.get(HTTPHelper.java:88) at com.springml.salesforce.wave.util.HTTPHelper.get(HTTPHelper.java:92) at com.springml.salesforce.wave.impl.ForceAPIImpl.query(ForceAPIImpl.java:120) at com.springml.salesforce.wave.impl.ForceAPIImpl.query(ForceAPIImpl.java:36) at com.az.sfget.SFGetTest.main(SFGetTest.java:54) Caused by: java.io.EOFException: SSL peer shut down incorrectly at sun.security.ssl.InputRecord.read(InputRecord.java:482) at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927) ... 21 more Thanks Sri -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
sql to spark scala rdd
Hi All, I managed to write business requirement in spark-sql and hive I am still learning scala how this below sql be written using spark RDD not spark data frames. SELECT DATE,balance, SUM(balance) OVER (ORDER BY DATE ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) daily_balance FROM table -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/sql-to-spark-scala-rdd-tp27433.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
spark local dir to HDFS ?
Hi All, can I set spark.local.dir to HDFS location instead of /tmp folder ? I tried setting up temp folder to HDFS but it didn't worked can spark.local.dir write to HDFS ? .set("spark.local.dir","hdfs://namednode/spark_tmp/") 16/07/05 15:35:47 ERROR DiskBlockManager: Failed to create local dir in hdfs://namenode/spark_tmp/. Ignoring this directory. java.io.IOException: Failed to create a temp directory (under hdfs://namenode/spark_tmp/) after 10 attempts! Thanks Sri -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-local-dir-to-HDFS-tp27291.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: spark parquet too many small files ?
I found the jira for the issue will there be a fix in future ? or no fix ? https://issues.apache.org/jira/browse/SPARK-6221 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-parquet-too-many-small-files-tp27264p27267.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: spark parquet too many small files ?
Hi Neelesh, I told you in my emails it's not spark-Scala application , I am working on just spark SQL. I am launching spark-SQL shell and running my hive code inside spark SQL she'll. Spark SQL she'll accepts functions which relate to spark SQL doesn't accepts fictions like collasece which is spark Scala function. What I am trying to do is below. from(select * from source_table where load_date="2016-09-23")a Insert overwrite table target_table Select * Thanks Sri Sent from my iPhone > On 1 Jul 2016, at 17:35, nsalian [via Apache Spark User List] >wrote: > > Hi Sri, > > Thanks for the question. > You can simply start by doing this in the initial stage: > > val sqlContext = new SQLContext(sc) > val customerList = sqlContext.read.json(args(0)).coalesce(20) //using a json > example here > > where the argument is the path to the file(s). This will reduce the > partitions. > You can proceed with repartitioning the data further on. The goal would be to > reduce the number of files in the end as you do a saveAsParquet. > > Hope that helps. > Neelesh S. Salian > Cloudera > > > If you reply to this email, your message will be added to the discussion > below: > http://apache-spark-user-list.1001560.n3.nabble.com/spark-parquet-too-many-small-files-tp27264p27265.html > To unsubscribe from spark parquet too many small files ?, click here. > NAML -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-parquet-too-many-small-files-tp27264p27266.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
spark parquet too many small files ?
Hi All, I am running hive in spark-sql in yarn client mode, the sql is pretty simple load dynamic partitions to target parquet table. I used hive configurations parameters such as (set hive.merge.smallfiles.avgsize=25600;set hive.merge.size.per.task=256000;) which usually merges small files to 256mb block size these parameters are supported in spark-sql is there other way around to merge number of small parquet files to large one. if its a scala application I can use collasece() function or repartition but here we are not using spark-scala application its just plain spark-sql. Thanks Sri -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-parquet-too-many-small-files-tp27264.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Best way to merge final output part files created by Spark job
Try using collasece function to repartition to desired number of partitions files, to merge already output files use hive and insert overwrite table using below options. set hive.merge.smallfiles.avgsize=256; set hive.merge.size.per.task=256; set -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Best-way-to-merge-final-output-part-files-created-by-Spark-job-tp24681p27263.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: output part files max size
I am not sure but you can use collasece function to reduce number of output files . Thanks Sri -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/output-part-files-max-size-tp17013p27262.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Spark partition formula on standalone mode?
Hi All, I did worked on spark installed on Hadoop cluster but never worked on spark on standalone cluster. My question how to set number of partitions in spark when it's running on spark standalone cluster? If spark on Hadoop I calculate my formula using hdfs block sizes but how I calculate without hdfs block size if spark running on standalone non Hadoop cluster. Partition formula for 100gb file:- Hdfs block size:-256 100*1024 =400 partitions /256 Executors:- 100/4= 25 Executor memory:- 160gb/25=7 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-partition-formula-on-standalone-mode-tp27237.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark sql broadcast join ?
Hi All, I had used broadcast join in spark-scala applications I did used partitionby (Hash Partitioner) and then persit for wide dependencies, present project which I am working on pretty much Hive migration to spark-sql which is pretty much sql to be honest no scala or python apps. My question how to achieve broadcast join in plain spark-sql ? at the moment join between two talbes is taking ages. Thanks Sri -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-broadcast-join-tp27184.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Saprk 1.6 Driver Memory Issue
Hi , I am using spark-sql shell wile launching I am running it as spark-sql --conf spark.driver.maxResultSize=20g I tried using spark-sql --conf "spark.driver.maxResults"="20g" but still no luck do I need to use set command something like spark-sql --conf set "spark.driver.maxReults"="20g" is it --conf or -conf ? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Saprk-1-6-Driver-Memory-Issue-tp27063p27066.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Saprk 1.6 Driver Memory Issue
Hi All , I am getting spark driver memory issue even after overriding the conf by using --conf spark.driver.maxResultSize=20g and I also mentioned in my sql script (set spark.driver.maxResultSize =16;) but still the same error happening. Job aborted due to stage failure: Total size of serialized results of 79 tasks (1035.2 MB) is bigger than spark.driver.maxResultSize (1024.0 MB) Any thoughts ? Thanks Sri -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Saprk-1-6-Driver-Memory-Issue-tp27063.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
set spark 1.6 with Hive 0.14 ?
Hi All , Is there a way to ask spark and spark-sql to use Hive 0.14 version instead of inbuilt hive 1.2.1. I am testing spark-sql locally by downloading spark 1.6 from internet , I want to execute my hive queries in spark sql using hive version 0.14 can I go back to previous version just for a simple test. Please share out the steps involved. Thanks Sri -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/set-spark-1-6-with-Hive-0-14-tp26989.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
how to run latest version of spark in old version of spark in cloudera cluster ?
Hi All, Just realized cloudera version of spark on my cluster is 1.2, the jar which I built using maven is version 1.6 which is causing issue. Is there a way to run spark version 1.6 in 1.2 version of spark ? Thanks Sri -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-run-latest-version-of-spark-in-old-version-of-spark-in-cloudera-cluster-tp26087.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
org.netezza.error.NzSQLException: ERROR: Invalid datatype - TEXT
Hi All, I am using Spark jdbc df to store data into Netezza , I think spark is trying to create table using data type TEXT for string column , netezza doesn't support data type text. how to overwrite spark method to use VARCHAR instead of data type text ? val sourcedfmode=sourcedf.persist(StorageLevel.MEMORY_AND_DISK).write.mode("overwrite") sourcedfmode.jdbc(TargetDBinfo.url,TargetDBinfo.table,targetprops) Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/org-netezza-error-NzSQLException-ERROR-Invalid-datatype-TEXT-tp26072.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: org.netezza.error.NzSQLException: ERROR: Invalid datatype - TEXT
Fixed by creating a new netezza Dialect and registered in jdbcDialects using JdbcDialects.registerDialect(NetezzaDialect) method (spark/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala) package com.citi.ocean.spark.elt /** * Created by st84879 on 26/01/2016. */ import java.sql.{Connection, Types} import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils import org.apache.spark.sql.jdbc.JdbcDialect import org.apache.spark.sql.types._ import org.apache.spark.sql.jdbc.JdbcType private object NetezzaDialect extends JdbcDialect{ override def canHandle(url: String): Boolean = url.startsWith("jdbc:netezza") override def getCatalystType( sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = { if (sqlType == Types.BIT && typeName.equals("bit") && size != 1) { Some(BinaryType) } else if (sqlType == Types.OTHER) { toCatalystType(typeName).filter(_ == StringType) } else if (sqlType == Types.ARRAY && typeName.length > 1 && typeName(0) == '_') { toCatalystType(typeName.drop(1)).map(ArrayType(_)) } else None } // TODO: support more type names. private def toCatalystType(typeName: String): Option[DataType] = typeName match { case "bool" => Some(BooleanType) case "bit" => Some(BinaryType) case "int2" => Some(ShortType) case "int4" => Some(IntegerType) case "int8" | "oid" => Some(LongType) case "float4" => Some(FloatType) case "money" | "float8" => Some(DoubleType) case "text" | "varchar" | "char" | "cidr" | "inet" | "json" | "jsonb" | "uuid" => Some(StringType) case "bytea" => Some(BinaryType) case "timestamp" | "timestamptz" | "time" | "timetz" => Some(TimestampType) case "date" => Some(DateType) case "numeric" => Some(DecimalType.SYSTEM_DEFAULT) case _ => None } override def getJDBCType(dt: DataType): Option[JdbcType] = dt match { case StringType => Some(JdbcType("VARCHAR(1000)", Types.CHAR)) case BinaryType => Some(JdbcType("BYTEA", Types.BINARY)) case BooleanType => Some(JdbcType("BOOLEAN", Types.BOOLEAN)) case FloatType => Some(JdbcType("FLOAT4", Types.FLOAT)) case DoubleType => Some(JdbcType("FLOAT8", Types.DOUBLE)) case ArrayType(et, _) if et.isInstanceOf[AtomicType] => getJDBCType(et).map(_.databaseTypeDefinition) .orElse(JdbcUtils.getCommonJDBCType(et).map(_.databaseTypeDefinition)) .map(typeName => JdbcType(s"$typeName[]", java.sql.Types.ARRAY)) case ByteType => throw new IllegalArgumentException(s"Unsupported type in netezza: $dt"); case _ => None } override def getTableExistsQuery(table: String): String = { s"SELECT 1 FROM $table LIMIT 1" } override def beforeFetch(connection: Connection, properties: Map[String, String]): Unit = { super.beforeFetch(connection, properties) // According to the postgres jdbc documentation we need to be in autocommit=false if we actually // want to have fetchsize be non 0 (all the rows). This allows us to not have to cache all the // rows inside the driver when fetching. // // See: https://jdbc.postgresql.org/documentation/head/query.html#query-with-cursor // if (properties.getOrElse("fetchsize", "0").toInt > 0) { connection.setAutoCommit(false) } } } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/org-netezza-error-NzSQLException-ERROR-Invalid-datatype-TEXT-tp26072p26075.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark 1.6 Issue
Hi All, worked OK by adding below in VM options. -Xms128m -Xmx512m -XX:MaxPermSize=300m -ea Thanks Sri -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-1-6-Issue-tp25893p25920.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark 1.6 Issue
Hi All, I am running my app in IntelliJ Idea (locally) my config local[*] , the code worked ok with spark 1.5 but when I upgraded to 1.6 I am having below issue. is this a bug in 1.6 ? I change back to 1.5 it worked ok without any error do I need to pass executor memory while running in local in spark 1.6 ? Exception in thread "main" java.lang.IllegalArgumentException: System memory 259522560 must be at least 4.718592E8. Please use a larger heap size. Thanks Sri -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-1-6-Issue-tp25893.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Apache spark certification pass percentage ?
Hi All, Does anyone know pass percentage for Apache spark certification exam ? Thanks Sri -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Apache-spark-certification-pass-percentage-tp25761.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
how to turn off spark streaming gracefully ?
Hi All, Imagine I have a Production spark streaming kafka (direct connection) subscriber and publisher jobs running which publish and subscriber (receive) data from a kafka topic and I save one day's worth of data using dstream.slice to Cassandra daily table (so I create daily table before running spark streaming job). My question if all the above code runs in some scheduler like autosys how should I say to spark publisher to stop publishing as it is End of day and to spark subscriber to stop receiving to stop receiving without killing the jobs ? if I kill my autosys scheduler turns red saying the job had failed etc ... Is there a way to stop both subscriber and publisher with out killing or terminating the code. Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-turn-off-spark-streaming-gracefully-tp25734.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark data frame write.mode("append") bug
Hi All, https://github.com/apache/spark/blob/branch-1.5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala#L48 In Present spark version in line 48 there is a bug, to check whether table exists in a database using limit doesnt work for all databases sql server for example. best way to check whehter table exists in any database is to use, select * from table where 1=2; or select 1 from table where 1=2; this supports all the databases. In spark 1.6 can this change be implemented, this lets write.mode("append") bug to go away. def tableExists(conn: Connection, table: String): Boolean = { // Somewhat hacky, but there isn't a good way to identify whether a table exists for all // SQL database systems, considering "table" could also include the database name. Try(conn.prepareStatement(s"SELECT 1 FROM $table LIMIT 1").executeQuery().next()).isSuccess } Solution:- def tableExists(conn: Connection, table: String): Boolean = { // Somewhat hacky, but there isn't a good way to identify whether a table exists for all // SQL database systems, considering "table" could also include the database name. Try(conn.prepareStatement(s"SELECT 1 FROM $table where 1=2").executeQuery().next()).isSuccess } Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-data-frame-write-mode-append-bug-tp25650p25693.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark data frame write.mode("append") bug
Hi Spark Contributors, I am trying to append data to target table using df.write.mode("append") functionality but spark throwing up table already exists exception. Is there a fix scheduled in later spark release ?, I am using spark 1.5. val sourcedfmode=sourcedf.write.mode("append") sourcedfmode.jdbc(TargetDBinfo.url,TargetDBinfo.table,targetprops) Full Code:- https://github.com/kali786516/ScalaDB/blob/master/src/main/java/com/kali/db/SaprkSourceToTargetBulkLoad.scala Spring Config File:- https://github.com/kali786516/ScalaDB/blob/master/src/main/resources/SourceToTargetBulkLoad.xml Thanks Sri -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-data-frame-write-mode-append-bug-tp25650.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: can i write only RDD transformation into hdfs or any other storage system
Hi Prateek, you mean writing spark output to any storage system ? yes you can . Thanks Sri -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/can-i-write-only-RDD-transformation-into-hdfs-or-any-other-storage-system-tp25637p25651.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Release data for spark 1.6?
Hi All, does anyone know exact release data for spark 1.6 ? Thanks Sri -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Release-data-for-spark-1-6-tp25654.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Exception in Spark-sql insertIntoJDBC command
Hi All, I have the same error in spark 1.5 is there any solution to get around with this ? I also tried using sourcedf.write.mode("append") but still no luck . val sourcedfmode=sourcedf.write.mode("append") sourcedfmode.jdbc(TargetDBinfo.url,TargetDBinfo.table,targetprops) Thanks Sri -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Exception-in-Spark-sql-insertIntoJDBC-command-tp24655p25640.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark sql current time stamp function ?
Hi All, Is there a spark sql function which returns current time stamp Example:- In Impala:- select NOW(); SQL Server:- select GETDATE(); Netezza:- select NOW(); Thanks Sri -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-current-time-stamp-function-tp25620.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark sql current time stamp function ?
I found a way out. import java.text.SimpleDateFormat import java.util.Date; val format = new SimpleDateFormat("-M-dd hh:mm:ss") val testsql=sqlContext.sql("select column1,column2,column3,column4,column5 ,'%s' as TIME_STAMP from TestTable limit 10".format(format.format(new Date( Thanks Sri -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-current-time-stamp-function-tp25620p25621.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark sql random number or sequence numbers ?
Hi All, I did implemented random_numbers using scala spark , is there a function to get row_number equivalent in spark sql ? example:- sql server:-row_number() Netezza:- sequence number mysql:- sequence number Example:- val testsql=sqlContext.sql("select column1,column2,column3,column4,column5 ,row_number() as random from TestTable limit 10") Thanks Sri -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-sql-random-number-or-sequence-numbers-tp25623.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark sql data frames do they run in parallel by default?
Hi all, I wrote below spark code to extract data from SQL server using spark SQLContext.read.format with several different options , question does by default sqlContext.read load function run in parallel does it use all the available cores available ? when I am saving the output to a file it is getting saved as one file does it mean the code used single core in my machine ? My Code:- package com.kali.db /** * Created by kalit_000 on 06/12/2015. */ import org.apache.spark.SparkConf import org.apache.log4j.Logger import org.apache.log4j.Level import org.apache.spark._ import org.apache.spark.rdd.{JdbcRDD, RDD} import org.apache.spark.sql.DataFrame import org.springframework.context.support.ClassPathXmlApplicationContext case class SparkSqlValueClass(driver:String,url:String,username:String,password:String,sql:String,table:String,opdelimeter:String) object SparkSqlSelectSpring { def main (args: Array[String]) { Logger.getLogger("org").setLevel(Level.WARN) Logger.getLogger("akka").setLevel(Level.WARN) val conf = new SparkConf().setMaster("local[*]").setAppName("SparkSqlConfigurable").set("spark.hadoop.validateOutputSpecs", "false") val sc = new SparkContext(conf) def opfile(value:DataFrame,delimeter:String):RDD[String]= { value.map(x => x.toString.replace("[","").replace("]","").replace(",",delimeter)) } //read the application context file val ctx = new ClassPathXmlApplicationContext("sparksql.xml") val DBinfo = ctx.getBean("SparkSQLInst").asInstanceOf[SparkSqlValueClass] val driver = DBinfo.driver val url = DBinfo.url val username = DBinfo.username val password = DBinfo.password val query = DBinfo.sql val sqlquery = DBinfo.sql val table = DBinfo.table val opdelimeter=DBinfo.opdelimeter println("DB Driver:-%s".format(driver)) println("DB Url:-%s".format(url)) println("Username:-%s".format(username)) println("Password:-%s".format(password)) println("Query:-%s".format(query)) println("Table:-%s".format(table)) println("Opdelimeter:-%s".format(opdelimeter)) try { val sqlContext = new org.apache.spark.sql.SQLContext(sc) val df = sqlContext.read.format("jdbc").options(Map("url" -> url,"dbtable" -> table,"driver" -> driver)).load() df.registerTempTable(table) val OP=sqlContext.sql(query) opfile(OP,opdelimeter).saveAsTextFile("C:\\Users\\kalit_000\\Desktop\\typesafe\\scaladbop\\op.txt") } catch { case e: Exception => e.printStackTrace } sc.stop() } } Spring Bean:- http://www.springframework.org/dtd/spring-beans.dtd;> Thanks Sri -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-sql-data-frames-do-they-run-in-parallel-by-default-tp25604.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark sql data frames do they run in parallel by default?
Hi All, I re wrote my code to use sqlContext.read.jdbc which lets me specify upperbound,lowerbound,numberofparitions etc .. which might run in parallel, I need to try on a cluster which I will do when I have time. But please confirm read.jdbc does parallel reads ? Spark code:- package com.kali.db /** * Created by kalit_000 on 06/12/2015. */ import java.util.Properties import org.apache.spark.SparkConf import org.apache.log4j.Logger import org.apache.log4j.Level import org.apache.spark._ import org.apache.spark.rdd.{JdbcRDD, RDD} import org.apache.spark.sql.DataFrame import org.springframework.context.support.ClassPathXmlApplicationContext case class SparkSqlValueClassMPP(driver:String,url:String,username:String,password:String,sql:String,table:String,opdelimeter:String,lowerbound:String,upperbound:String,numberofparitions:String,parallelizecolumn:String) object SparkDBExtractorMPP { def main (args: Array[String]) { Logger.getLogger("org").setLevel(Level.WARN) Logger.getLogger("akka").setLevel(Level.WARN) val conf = new SparkConf().setMaster("local[*]").setAppName("SparkDBExtractorMPP").set("spark.hadoop.validateOutputSpecs", "false") val sc = new SparkContext(conf) def opfile(value:DataFrame,delimeter:String):RDD[String]= { value.map(x => x.toString.replace("[","").replace("]","").replace(",",delimeter)) } //read the application context file val ctx = new ClassPathXmlApplicationContext("sparkDBExtractorMpp.xml") val DBinfo = ctx.getBean("SparkSQLDBExtractorMPP").asInstanceOf[SparkSqlValueClassMPP] val driver = DBinfo.driver val url = DBinfo.url val username = DBinfo.username val password = DBinfo.password val query = DBinfo.sql val sqlquery = DBinfo.sql val table = DBinfo.table val opdelimeter=DBinfo.opdelimeter val lowerbound=DBinfo.lowerbound.toInt val upperbound=DBinfo.upperbound.toInt val numberofpartitions=DBinfo.numberofparitions.toInt val parallelizecolumn=DBinfo.parallelizecolumn println("DB Driver:-%s".format(driver)) println("DB Url:-%s".format(url)) println("Username:-%s".format(username)) println("Password:-%s".format(password)) println("Query:-%s".format(query)) println("Table:-%s".format(table)) println("Opdelimeter:-%s".format(opdelimeter)) println("Lowerbound:-%s".format(lowerbound)) println("Upperbound:-%s".format(upperbound)) println("Numberofpartitions:-%s".format(numberofpartitions)) println("Parallelizecolumn:-%s".format(parallelizecolumn)) try { val props=new Properties() props.put("user",username) props.put("password",password) props.put("driver",driver) val sqlContext = new org.apache.spark.sql.SQLContext(sc) val df = sqlContext.read.jdbc(url,table,parallelizecolumn,lowerbound,upperbound,numberofpartitions,props) df.show(10) df.registerTempTable(table) val OP=sqlContext.sql(query) opfile(OP,opdelimeter).saveAsTextFile("C:\\Users\\kalit_000\\Desktop\\typesafe\\scaladbop\\op.txt") } catch { case e: Exception => e.printStackTrace } sc.stop() } } Spring:- http://www.springframework.org/dtd/spring-beans.dtd;> -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-sql-data-frames-do-they-run-in-parallel-by-default-tp25604p25611.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Does spark streaming write ahead log writes all received data to HDFS ?
Hi All, If write ahead logs are enabled in spark streaming does all the received data gets written to HDFS path ? or it only writes the metadata. How does clean up works , does HDFS path gets bigger and bigger up everyday do I need to write an clean up job to delete data from write ahead logs folder ? what actually does write ahead log folder has ? Thanks Sri -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Does-spark-streaming-write-ahead-log-writes-all-received-data-to-HDFS-tp25439.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark inner join
Hi All, In sql say for example I have table1 (moveid) and table2 (movieid,moviename) in sql we write something like select moviename ,movieid,count(1) from table2 inner join table table1 on table1.movieid=table2.moveid group by , here in sql table1 has only one column where as table 2 has two columns still the join works , same way in spark can join on keys from both the rdd's ? – when I tried to join two rdd in spark both the rdd's should have number of elements for that I need to add a dummy value 0 for example is there other way around or am I doing completely wrong ? val lines=sc.textFile("C:\\Users\\kalit_000\\Desktop\\udemy_spark\\ml-100k\\u.data") val movienamesfile=sc.textFile("C:\\Users\\kalit_000\\Desktop\\udemy_spark\\ml-100k\\u.item") val moviesid=lines.map(x => x.split("\t")).map(x => (x(1),0)) val test=moviesid.map(x => x._1) val movienames=movienamesfile.map(x => x.split("\\|")).map(x => (x(0),x(1))) val movienamejoined=moviesid.join(movienames).distinct() Thanks Sri -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-inner-join-tp25193.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Saprk error:- Not a valid DFS File name
Hi All, got this weird error when I tried to run spark on YARN-CLUSTER mode , I have 33 files and I am looping spark in bash one by one most of them worked ok except few files. Is this below error HDFS or spark error ? Exception in thread "Driver" java.lang.IllegalArgumentException: Pathname /user/myid/-u/12:51/_temporary/0 from hdfs://dev/user/myid/-u/12:51/_temporary/0 is not a valid DFS filename. File Name which I passed to spark , does file name causes issue ? hdfs://dev/data/20151019/sipmktdata.ColorDataArchive.UTD.P4_M-P.v5.2015-09-18.txt.20150918 Thanks Sri -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Saprk-error-Not-a-valid-DFS-File-name-tp25186.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Saprk error:- Not a valid DFS File name
Full Error:- at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:195) at org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:104) at org.apache.hadoop.hdfs.DistributedFileSystem$16.doCall(DistributedFileSystem.java:831) at org.apache.hadoop.hdfs.DistributedFileSystem$16.doCall(DistributedFileSystem.java:827) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.mkdirsInternal(DistributedFileSystem.java:827) at org.apache.hadoop.hdfs.DistributedFileSystem.mkdirs(DistributedFileSystem.java:820) at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1817) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:305) at org.apache.hadoop.mapred.FileOutputCommitter.setupJob(FileOutputCommitter.java:131) at org.apache.spark.SparkHadoopWriter.preSetup(SparkHadoopWriter.scala:64) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1046) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:941) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:850) at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1164) at com.citi.ocean.spark.SimpleMktDataFlow$.main(SimpleMktDataFlow.scala:106) at com.citi.ocean.spark.SimpleMktDataFlow.main(SimpleMktDataFlow.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:427) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Saprk-error-Not-a-valid-DFS-File-name-tp25186p25188.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
difference between rdd.collect().toMap to rdd.collectAsMap() ?
Hi All, Is there any performance impact when I use collectAsMap on my RDD instead of rdd.collect().toMap ? I have a key value rdd and I want to convert to HashMap as far I know collect() is not efficient on large data sets as it runs on driver can I use collectAsMap instead is there any performance impact ? Original:- val QuoteHashMap=QuoteRDD.collect().toMap val QuoteRDDData=QuoteHashMap.values.toSeq val QuoteRDDSet=sc.parallelize(QuoteRDDData.map(x => x.toString.replace("(","").replace(")",""))) QuoteRDDSet.saveAsTextFile(Quotepath) Change:- val QuoteHashMap=QuoteRDD.collectAsMap() val QuoteRDDData=QuoteHashMap.values.toSeq val QuoteRDDSet=sc.parallelize(QuoteRDDData.map(x => x.toString.replace("(","").replace(")",""))) QuoteRDDSet.saveAsTextFile(Quotepath) Thanks Sri -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/difference-between-rdd-collect-toMap-to-rdd-collectAsMap-tp25139.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Pass spark partition explicitly ?
Hi All, can I pass number of partitions to all the RDD explicitly while submitting the spark Job or di=o I need to mention in my spark code itself ? Thanks Sri -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Pass-spark-partition-explicitly-tp25113.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
can I use Spark as alternative for gem fire cache ?
Hi All, Can spark be used as an alternative to gem fire cache ? we use gem fire cache to save (cache) dimension data in memory which is later used by our Java custom made ETL tool can I do something like below ? can I cache a RDD in memory for a whole day ? as of I know RDD will get empty once the spark code finish executing (correct me if I am wrong). Spark:- create a RDD rdd.persistance Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/can-I-use-Spark-as-alternative-for-gem-fire-cache-tp25106.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Output println info in LogMessage Info ?
Hi All, I n Unix I can print some warning or info using LogMessage WARN "Hi All" or LogMessage INFO "Hello World" is there similar thing in Spark ? Imagine I wan to print count of RDD in Logs instead of using Println Thanks Sri -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Output-println-info-in-LogMessage-Info-tp25107.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Is there any better way of writing this code
Hi All, just wonderign is there any better way of writing this below code, I am new to spark an I feel what I wrote is pretty simple and basic and straight forward is there any better way of writing using functional paradigm. val QuoteRDD=quotefile.map(x => x.split("\\|")). filter(line => line(0).contains("1017")). map(x => ((x(5)+x(4)) , (x(5),x(4),x(1) , if (x(15) =="B") ( {if (x(25) == "") x(9) else x(25)}, {if (x(37) == "") x(11) else x(37)} ) else if (x(15) =="C" ) ( {if (x(24) == "") (x(9)) else x(24)}, {if (x(30) == "") (x(11)) else x(30)} ) else if (x(15) =="A") {(x(9),x(11))} ))) Thanks Sri -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-any-better-way-of-writing-this-code-tp25027.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Create hashmap using two RDD's
Got it ..., created hashmap and saved it to file please follow below steps .. val QuoteRDD=quotefile.map(x => x.split("\\|")). filter(line => line(0).contains("1017")). map(x => ((x(5)+x(4)) , (x(5),x(4),x(1) , if (x(15) =="B") ( {if (x(25) == "") x(9) else x(25)}, {if (x(37) == "") x(11) else x(37)} ) else if (x(15) =="C" ) ( {if (x(24) == "") (x(9)) else x(24)}, {if (x(30) == "") (x(11)) else x(30)} ) else if (x(15) =="A") {(x(9),x(11))} ))) val QuoteHashMap=QuoteRDD.collect().toMap val test=QuoteHashMap.values.toSeq val test2=sc.parallelize(test.map(x => x.toString.replace("(","").replace(")",""))) test2.saveAsTextFile("C:\\Users\\kalit_000\\Desktop\\mkdata\\test.txt") test2.collect().foreach(println) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Create-hashmap-using-two-RDD-s-tp24996p25014.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Create hashmap using two RDD's
Hi All, I changed my way of approach now I am bale to load data into MAP and get data out using get command. val QuoteRDD=quotefile.map(x => x.split("\\|")). filter(line => line(0).contains("1017")). map(x => ((x(5)+x(4)) , (x(5),x(4),x(1) , if (x(15) =="B") if (x(25) =="") x(9) else (x(25)), if (x(37) =="") x(11) else (x(37)), if (x(15) =="C") if (x(24) =="") x(9) else (x(24)), if (x(30) =="") x(11) else (x(30)), if (x(15) =="A") x(9), x(11) ))) val QuoteHashMap=QuoteRDD.collect().toMap QuoteHashMap.get("CPHI08173").foreach(println) Problem now is how to save value data from the hashmap to a file ? I need to iterate the keys in hash and save it to a file for ( ((k,v)) <-QuoteHashMap) QuoteHashMap.get(k) Thanks Sri -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Create-hashmap-using-two-RDD-s-tp24996p25008.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Create hashmap using two RDD's
Hi all, I am trying to create a hashmap using two rdd, but having issues key not found do I need to convert RDD to list first ? 1) rdd has key data 2) rdd has value data Key Rdd:- val quotekey=file.map(x => x.split("\\|")).filter(line => line(0).contains("1017")).map(x => x(5)+x(4)) Value Rdd:- val QuoteRDD=quotefile.map(x => x.split("\\|")). filter(line => line(0).contains("1017")). map(x => (x(5).toString+x(4).toString,x(5).toString,x(4).toString,x(1).toString , if (x(15).toString =="B") if (x(25).toString =="") x(9).toString else (x(25).toString), if (x(37).toString =="") x(11).toString else (x(37).toString), if (x(15).toString =="C") if (x(24).toString =="") x(9).toString else (x(24).toString), if (x(30).toString =="") x(11).toString else (x(30).toString), if (x(15).toString =="A") x(9).toString, x(11).toString )) Hash Map:- val quotehash = new HashMap[String,String] quotehash + quotekey.toString() -> QuoteRDD quotehash("CPHI08172") Error:- key not found: CPHI08172 Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Create-hashmap-using-two-RDD-s-tp24996.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: KafkaProducer using Cassandra as source
Guys sorry I figured it out. val x=p.collect().mkString("\n").replace("[","").replace("]","").replace(",","~") Full Code:- package com.examples /** * Created by kalit_000 on 22/09/2015. */ import kafka.producer.KeyedMessage import kafka.producer.Producer import kafka.producer.ProducerConfig import java.util.Properties import _root_.kafka.serializer.StringDecoder import org.apache.spark._ import org.apache.spark.SparkContext._ import org.apache.spark.sql.SQLContext import org.apache.spark.SparkConf import org.apache.log4j.Logger import org.apache.log4j.Level import org.apache.spark.streaming._ import org.apache.spark.streaming.{Seconds,StreamingContext} import org.apache.spark._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.kafka.KafkaUtils object SparkProducerDBCassandra { case class TestTable (TRADE_ID:String,TRADE_PRICE: String) def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.WARN) Logger.getLogger("akka").setLevel(Level.WARN) val conf = new SparkConf().setMaster("local[2]").setAppName("testkali2").set("spark.cassandra.connection.host", "127.0.0.1") val sc=new SparkContext("local","test",conf) //val ssc= new StreamingContext(sc,Seconds(2)) print("Test kali Spark Cassandra") val cc = new org.apache.spark.sql.cassandra.CassandraSQLContext(sc) val p=cc.sql("select * from people.person") p.collect().foreach(println) val props:Properties = new Properties() props.put("metadata.broker.list", "localhost:9092") props.put("serializer.class", "kafka.serializer.StringEncoder") val config= new ProducerConfig(props) val producer= new Producer[String,String](config) val x=p.collect().mkString("\n").replace("[","").replace("]","").replace(",","~") producer.send(new KeyedMessage[String, String]("trade", x)) //p.collect().foreach(print) //ssc.start() //ssc.awaitTermination() } } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/KafkaProducer-using-Cassandra-as-source-tp24774p24788.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Kafka createDirectStream issue
Hi , I am trying to develop in intellij Idea same code I am having the same issue is there any work around. Error in intellij:- cannot resolve symbol createDirectStream import kafka.serializer.StringDecoder import org.apache.spark._ import org.apache.spark.SparkContext._ import org.apache.spark.sql.SQLContext import org.apache.spark.SparkConf import org.apache.log4j.Logger import org.apache.log4j.Level import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.{Seconds,StreamingContext} import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.kafka._ import com.datastax.spark.connector.streaming._ import org.apache.spark.streaming.kafka._ object SparkKafkaOffsetTest { def main(args: Array[String]): Unit = { //Logger.getLogger("org").setLevel(Level.WARN) //Logger.getLogger("akka").setLevel(Level.WARN) val conf = new SparkConf().setMaster("local").setAppName("KafkaOffsetStreaming").set("spark.executor.memory", "1g") val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(2)) val zkQuorm="localhost:2181" val group="test-group" val topics="first" val numThreads=1 val broker="localhost:9091" val kafkaParams = Map[String, String]("metadata.broker.list" -> broker) //val kafkaParams = Map[String, String]("metadata.broker.list" ) val topicMap=topics.split(",").map((_,numThreads.toInt)).toMap //val lineMap=KafkaUtils.createStream(ssc,zkQuorm,group,topicMap) //val directKafkaStream=KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) val messages= KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) //val directKafkaStream = KafkaUtils.createDirectStream[ //[key class], [value class], [key decoder class], [value decoder class] ]( //streamingContext, [map of Kafka parameters], [set of topics to consume]) } } Thanks Sri -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-createDirectStream-issue-tp23456p24749.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Unable to see my kafka spark streaming output
Hi All, figured it out for got mention local as loca[2] , at least two node required. package com.examples /** * Created by kalit_000 on 19/09/2015. */ import org.apache.spark._ import org.apache.spark.SparkContext._ import org.apache.spark.sql.SQLContext import org.apache.spark.SparkConf import org.apache.log4j.Logger import org.apache.log4j.Level import org.apache.spark.streaming.{Seconds,StreamingContext} import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.kafka.KafkaUtils object SparkStreamingKafka { def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.WARN) Logger.getLogger("akka").setLevel(Level.WARN) val conf = new SparkConf().setMaster("local[2]").setAppName("KafkaStreaming").set("spark.executor.memory", "1g") val sc=new SparkContext(conf) val ssc= new StreamingContext(sc,Seconds(2)) val zkQuorm="localhost:2181" val group="test-group" val topics="first" val numThreads=1 val topicMap=topics.split(",").map((_,numThreads.toInt)).toMap val lineMap=KafkaUtils.createStream(ssc,zkQuorm,group,topicMap) val lines=lineMap.map(_._2) lines.print //lines.print() //val words=lines.flatMap(_.split(" ")) // val pair=words.map( x => (x,1)) //val wordcount=pair.reduceByKeyAndWindow(_+_,_-_,Minutes(1),Seconds(2),2) //wordcount.print //ssc.checkpoint("hdfs://localhost:9000/user/hduser/checkpoint") ssc.checkpoint("C:\\scalatutorials\\sparkstreaming_checkpoint_folder") //C:\scalatutorials\sparkstreaming_checkpoint_folder ssc.start() ssc.awaitTermination() } } Thanks Sri -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Unable-to-see-my-kafka-spark-streaming-output-tp24750p24751.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Unable to see my kafka spark streaming output
Hi All, I am unable to see the output getting printed in the console can anyone help. package com.examples /** * Created by kalit_000 on 19/09/2015. */ import org.apache.spark._ import org.apache.spark.SparkContext._ import org.apache.spark.sql.SQLContext import org.apache.spark.SparkConf import org.apache.log4j.Logger import org.apache.log4j.Level import org.apache.spark.streaming.{Seconds,StreamingContext} import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.kafka.KafkaUtils object SparkStreamingKafka { def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.WARN) Logger.getLogger("akka").setLevel(Level.WARN) val conf = new SparkConf().setMaster("local").setAppName("KafkaStreaming").set("spark.executor.memory", "1g") val sc=new SparkContext(conf) val ssc= new StreamingContext(sc,Seconds(2)) val zkQuorm="localhost:2181" val group="test-group" val topics="first" val numThreads=1 val topicMap=topics.split(",").map((_,numThreads.toInt)).toMap val lineMap=KafkaUtils.createStream(ssc,zkQuorm,group,topicMap) val lines=lineMap.map(_._2) //lines.print() val words=lines.flatMap(_.split(" ")) val pair=words.map( x => (x,1)) val wordcount=pair.reduceByKeyAndWindow(_+_,_-_,Minutes(1),Seconds(2),2) wordcount.print //ssc.checkpoint("hdfs://localhost:9000/user/hduser/checkpoint") ssc.checkpoint("C:\\scalatutorials\\sparkstreaming_checkpoint_folder") //C:\scalatutorials\sparkstreaming_checkpoint_folder ssc.start() ssc.awaitTermination() } } Thanks Sri -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Unable-to-see-my-kafka-spark-streaming-output-tp24750.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
word count (group by users) in spark
Hi All, I would like to achieve this below output using spark , I managed to write in Hive and call it in spark but not in just spark (scala), how to group word counts on particular user (column) for example. Imagine users and their given tweets I want to do word count based on user name. Input:- kaliA,B,A,B,B james B,A,A,A,B Output:- kali A [Count] B [Count] James A [Count] B [Count] My Hive Answer:- CREATE EXTERNAL TABLE TEST ( user_name string , COMMENTS STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001' STORED AS TEXTFILE LOCATION '/data/kali/test'; HDFS FOLDER (create hdfs folder and create a text file with data mentioned in the email) use default;select user_name,COLLECT_SET(text) from (select user_name,concat(sub,' ',count(comments)) as text from test LATERAL VIEW explode(split(comments,',')) subView AS sub group by user_name,sub)w group by user_name; Spark With Hive:- package com.examples /** * Created by kalit_000 on 17/09/2015. */ import org.apache.log4j.Logger import org.apache.log4j.Level import org.apache.spark.sql.SQLContext import org.apache.spark.sql.hive.HiveContext import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.SparkContext._ object HiveWordCount { def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.WARN) Logger.getLogger("akka").setLevel(Level.WARN) val conf = new SparkConf().setMaster("local").setAppName("HiveWordCount").set("spark.executor.memory", "1g") val sc = new SparkContext(conf) val sqlContext= new SQLContext(sc) val hc=new HiveContext(sc) hc.sql("CREATE EXTERNAL TABLE IF NOT EXISTS default.TEST (user_name string ,COMMENTS STRING )ROW FORMAT DELIMITED FIELDS TERMINATED BY '001' STORED AS TEXTFILE LOCATION '/data/kali/test' ") val op=hc.sql("select user_name,COLLECT_SET(text) from (select user_name,concat(sub,' ',count(comments)) as text from default.test LATERAL VIEW explode(split(comments,',')) subView AS sub group by user_name,sub)w group by user_name") op.collect.foreach(println) } Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/word-count-group-by-users-in-spark-tp24748.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
split function on spark sql created rdd
Hi All, I am trying to do word count on number of tweets, my first step is to get data from table using spark sql and then run split function on top of it to calculate word count. Error:- valuse split is not a member of org.apache.spark.sql.SchemaRdd Spark Code that doesn't work to do word count:- val disitnct_tweets=hiveCtx.sql(select distinct(text) from tweets_table where text '') val distinct_tweets_List=sc.parallelize(List(distinct_tweets)) //tried split on both the rdd disnt worked distinct_tweets.flatmap(line = line.split( )).map(word = (word,1)).reduceByKey(_+_) distinct_tweets_List.flatmap(line = line.split( )).map(word = (word,1)).reduceByKey(_+_) But when I output the data from sparksql to a file and load it again and run split it works. Example Code that works:- val distinct_tweets=hiveCtx.sql(select dsitinct(text) from tweets_table where text '') val distinct_tweets_op=distinct_tweets.collect() val rdd=sc.parallelize(distinct_tweets_op) rdd.saveAsTextFile(/home/cloudera/bdp/op) val textFile=sc.textFile(/home/cloudera/bdp/op/part-0) val counts=textFile.flatMap(line = line.split( )).map(word = (word,1)).reduceByKey(_+_) counts.SaveAsTextFile(/home/cloudera/bdp/wordcount) I don't want to write to file instead want to collect in a rdd and apply filter function on top of schema rdd, is there a way. Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/split-function-on-spark-sql-created-rdd-tp23001.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org