Structured Streaming to Kafka Topic

2019-03-06 Thread Pankaj Wahane
Hi,

I am using structured streaming for ETL.


val data_stream = spark
  .readStream // constantly expanding dataframe
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "sms_history")
  .option("startingOffsets", "earliest") // begin from start of topic
  .option("failOnDataLoss", "false")
  .load()

I transform this into a DataSet with following schema.

root
 |-- accountId: long (nullable = true)
 |-- countryId: long (nullable = true)
 |-- credits: double (nullable = true)
 |-- deliveryStatus: string (nullable = true)
 |-- senderId: string (nullable = true)
 |-- sentStatus: string (nullable = true)
 |-- source: integer (nullable = true)
 |-- createdOn: timestamp (nullable = true)
 |-- send_success_credits: double (nullable = true)
 |-- send_error_credits: double (nullable = true)
 |-- delivered_credits: double (nullable = true)
 |-- invalid_sd_credits: double (nullable = true)
 |-- undelivered_credits: double (nullable = true)
 |-- unknown_credits: double (nullable = true)


Now I want to write this transformed stream to another Kafka topic. I have 
temporarily used a UDF that accepts all these columns as parameters and create 
a json string for adding a column "value" for writing to Kafka.

Is there easier and cleaner way to do the same?


Thanks,
Pankaj



Re: how to add colum to dataframe

2016-12-06 Thread Pankaj Wahane
You may want to try using df2.na.fill(…)

From: lk_spark 
Date: Tuesday, 6 December 2016 at 3:05 PM
To: "user.spark" 
Subject: how to add colum to dataframe

hi,all:
   my spark version is 2.0
   I have a parquet file with one colum name url type is string,I wang get 
substring from the url and add it to the datafram:
   val df = spark.read.parquet("/parquetdata/weixin/page/month=201607")
   val df2 = df.withColumn("pa_bid",when($"url".isNull,col("url").substr(3, 5)))
   df2.select("pa_bid","url").show
   +--++
|pa_bid| url|
+--++
|  null|http://mp.weixin|
|  null|http://mp.weixin|
|  null|http://mp.weixin|
|  null|http://mp.weixin|
|  null|http://mp.weixin|
|  null|http://mp.weixin|
|  null|http://mp.weixin|
|  null|http://mp.weixin|

Why what I got is null?

2016-12-06

lk_spark


Re: Spark Streaming: java.lang.NoClassDefFoundError: org/apache/kafka/common/message/KafkaLZ4BlockOutputStream

2016-03-11 Thread Pankaj Wahane
Next thing you may want to check is if the jar has been provided to all the 
executors in your cluster. Most of the class not found errors got resolved for 
me after making required jars available in the SparkContext.

Thanks.

From: Ted Yu >
Date: Saturday, 12 March 2016 at 7:17 AM
To: Siva >
Cc: spark users >
Subject: Re: Spark Streaming: java.lang.NoClassDefFoundError: 
org/apache/kafka/common/message/KafkaLZ4BlockOutputStream

KafkaLZ4BlockOutputStream is in kafka-clients jar :

$ jar tvf kafka-clients-0.8.2.0.jar | grep KafkaLZ4BlockOutputStream
  1609 Wed Jan 28 22:30:36 PST 2015 
org/apache/kafka/common/message/KafkaLZ4BlockOutputStream$BD.class
  2918 Wed Jan 28 22:30:36 PST 2015 
org/apache/kafka/common/message/KafkaLZ4BlockOutputStream$FLG.class
  4578 Wed Jan 28 22:30:36 PST 2015 
org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.class

Can you check whether kafka-clients jar was in the classpath of the container ?

Thanks

On Fri, Mar 11, 2016 at 5:00 PM, Siva 
> wrote:
Hi Everyone,

All of sudden we are encountering the below error from one of the spark 
consumer. It used to work before without any issues.

When I restart the consumer with latest offsets, it is working fine for 
sometime (it executed few batches) and it fails again, this issue is 
intermittent.

Did any one come across this issue?

16/03/11 19:44:50 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 1.0 
(TID 3, ip-172-31-32-183.us-west-2.compute.internal): 
java.lang.NoClassDefFoundError: 
org/apache/kafka/common/message/KafkaLZ4BlockOutputStream
at kafka.message.ByteBufferMessageSet$.decompress(ByteBufferMessageSet.scala:65)
at 
kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:179)
at 
kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:192)
at 
kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:146)
at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
at scala.collection.Iterator$$anon$1.hasNext(Iterator.scala:847)
at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:615)
at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:160)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at 
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:199)
at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:56)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: 
org.apache.kafka.common.message.KafkaLZ4BlockOutputStream
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
... 23 more


Container id: container_1456361466298_0236_01_02
Exit code: 50
Stack trace: ExitCodeException exitCode=50:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:538)
at org.apache.hadoop.util.Shell.run(Shell.java:455)
at 
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)
at 
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)


Container exited with a non-zero exit 

Re: Question on take function - Spark Java API

2015-08-26 Thread Pankaj Wahane
Thanks Sonal.. I shall try doing that..

 On 26-Aug-2015, at 1:05 pm, Sonal Goyal sonalgoy...@gmail.com wrote:
 
 You can try using wholeTextFile which will give you a pair rdd of fileName, 
 content. flatMap through this and manipulate the content. 
 
 Best Regards,
 Sonal
 Founder, Nube Technologies http://www.nubetech.co/ 
 Check out Reifier at Spark Summit 2015 
 https://spark-summit.org/2015/events/real-time-fuzzy-matching-with-spark-and-elastic-search/
 
  http://in.linkedin.com/in/sonalgoyal
 
 
 
 On Wed, Aug 26, 2015 at 8:25 AM, Pankaj Wahane pankaj.wah...@qiotec.com 
 mailto:pankaj.wah...@qiotec.com wrote:
 Hi community members,
 
 
 Apache Spark is Fantastic and very easy to learn.. Awesome work!!!
 
 Question:
 
 I have multiple files in a folder and and the first line in each file is 
 name of the asset that the file belongs to. Second line is csv header row 
 and data starts from third row..
 
 Ex: File 1
 
 TestAsset01
 Time,dp_1,dp_2,dp_3
 11-01-2015 15:00:00,123,456,789
 11-01-2015 15:00:01,123,456,789
 . . .
 
 Ex: File 2
 
 TestAsset02
 Time,dp_1,dp_2,dp_3
 11-01-2015 15:00:00,1230,4560,7890
 11-01-2015 15:00:01,1230,4560,7890
 . . .
 
 I have got nearly 1000 files in each folder sizing ~10G
 
 I am using apache spark Java api to read all this files.
 
 Following is code extract that I am using:
 
 try (JavaSparkContext sc = new JavaSparkContext(conf)) {
 MapString, String readingTypeMap = getReadingTypesMap(sc);
 //Read File
 JavaRDDString data = 
 sc.textFile(resourceBundle.getString(FOLDER_NAME));
 //Get Asset
 String asset = data.take(1).get(0);
 //Extract Time Series Data
 JavaRDDString actualData = data.filter(line - 
 line.contains(DELIMERTER));
 //Strip header
 String header = actualData.take(1).get(0);
 String[] headers = header.split(DELIMERTER);
 //Extract actual data
 JavaRDDString timeSeriesLines = actualData.filter(line - 
 !line.equals(header));
 //Extract valid records
 JavaRDDString validated = timeSeriesLines.filter(line - 
 validate(line));
 //Find Granularity
 Integer granularity = 
 toInt(resourceBundle.getString(GRANULARITY));
 //Transform to TSD objects
 JavaRDDTimeSeriesData tsdFlatMap = 
 transformTotimeSeries(validated, asset, readingTypeMap, headers, 
 granularity);
 
 //Save to Cassandra
 
 javaFunctions(tsdFlatMap).writerBuilder(resourceBundle.getString(cassandra.tsd.keyspace),
 time_series_data, 
 mapToRow(TimeSeriesData.class)).saveToCassandra();
 
 System.out.println(Total Records:  + timeSeriesLines.count());
 System.out.println(Valid Records:  + validated.count());
 }
 Within TimeSeriesData Object I need to set the asset name for the reading, 
 so I need output of data.take(1) to be different for different files.
 
 
 Thank You.
 
 Best Regards,
 Pankaj
 
 
 
 
 QIO Technologies Limited is a limited company registered in England  Wales 
 at 1 Curzon Street, London, England, W1J 5HD, with registered number 09368431 
 
 This message and the information contained within it is intended solely for 
 the addressee and may contain confidential or privileged information. If you 
 have received this message in error please notify QIO Technologies Limited 
 immediately and then permanently delete this message. If you are not the 
 intended addressee then you must not copy, transmit, disclose or rely on the 
 information contained in this message or in any attachment to it, all such 
 use is prohibited to maximum extent possible by law.
 
 


-- 


QIO Technologies Limited is a limited company registered in England  Wales 
at 1 Curzon Street, London, England, W1J 5HD, with registered number 
09368431 

This message and the information contained within it is intended solely for 
the addressee and may contain confidential or privileged information. If 
you have received this message in error please notify QIO Technologies 
Limited immediately and then permanently delete this message. If you are 
not the intended addressee then you must not copy, transmit, disclose or 
rely on the information contained in this message or in any attachment to 
it, all such use is prohibited to maximum extent possible by law.


Question on take function - Spark Java API

2015-08-25 Thread Pankaj Wahane
Hi community members,


 Apache Spark is Fantastic and very easy to learn.. Awesome work!!!
 
 Question:
 
 I have multiple files in a folder and and the first line in each file is name 
 of the asset that the file belongs to. Second line is csv header row and data 
 starts from third row..
 
 Ex: File 1
 
 TestAsset01
 Time,dp_1,dp_2,dp_3
 11-01-2015 15:00:00,123,456,789
 11-01-2015 15:00:01,123,456,789
 . . .
 
 Ex: File 2
 
 TestAsset02
 Time,dp_1,dp_2,dp_3
 11-01-2015 15:00:00,1230,4560,7890
 11-01-2015 15:00:01,1230,4560,7890
 . . .
 
 I have got nearly 1000 files in each folder sizing ~10G
 
 I am using apache spark Java api to read all this files.
 
 Following is code extract that I am using:
 
 try (JavaSparkContext sc = new JavaSparkContext(conf)) {
 MapString, String readingTypeMap = getReadingTypesMap(sc);
 //Read File
 JavaRDDString data = 
 sc.textFile(resourceBundle.getString(FOLDER_NAME));
 //Get Asset
 String asset = data.take(1).get(0);
 //Extract Time Series Data
 JavaRDDString actualData = data.filter(line - 
 line.contains(DELIMERTER));
 //Strip header
 String header = actualData.take(1).get(0);
 String[] headers = header.split(DELIMERTER);
 //Extract actual data
 JavaRDDString timeSeriesLines = actualData.filter(line - 
 !line.equals(header));
 //Extract valid records
 JavaRDDString validated = timeSeriesLines.filter(line - 
 validate(line));
 //Find Granularity
 Integer granularity = 
 toInt(resourceBundle.getString(GRANULARITY));
 //Transform to TSD objects
 JavaRDDTimeSeriesData tsdFlatMap = 
 transformTotimeSeries(validated, asset, readingTypeMap, headers, granularity);
 
 //Save to Cassandra
 
 javaFunctions(tsdFlatMap).writerBuilder(resourceBundle.getString(cassandra.tsd.keyspace),
 time_series_data, 
 mapToRow(TimeSeriesData.class)).saveToCassandra();
 
 System.out.println(Total Records:  + timeSeriesLines.count());
 System.out.println(Valid Records:  + validated.count());
 }
 Within TimeSeriesData Object I need to set the asset name for the reading, so 
 I need output of data.take(1) to be different for different files.
 
 
 Thank You.
 
 Best Regards,
 Pankaj
 
 


-- 


QIO Technologies Limited is a limited company registered in England  Wales 
at 1 Curzon Street, London, England, W1J 5HD, with registered number 
09368431 

This message and the information contained within it is intended solely for 
the addressee and may contain confidential or privileged information. If 
you have received this message in error please notify QIO Technologies 
Limited immediately and then permanently delete this message. If you are 
not the intended addressee then you must not copy, transmit, disclose or 
rely on the information contained in this message or in any attachment to 
it, all such use is prohibited to maximum extent possible by law.