Structured Streaming to Kafka Topic
Hi, I am using structured streaming for ETL. val data_stream = spark .readStream // constantly expanding dataframe .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "sms_history") .option("startingOffsets", "earliest") // begin from start of topic .option("failOnDataLoss", "false") .load() I transform this into a DataSet with following schema. root |-- accountId: long (nullable = true) |-- countryId: long (nullable = true) |-- credits: double (nullable = true) |-- deliveryStatus: string (nullable = true) |-- senderId: string (nullable = true) |-- sentStatus: string (nullable = true) |-- source: integer (nullable = true) |-- createdOn: timestamp (nullable = true) |-- send_success_credits: double (nullable = true) |-- send_error_credits: double (nullable = true) |-- delivered_credits: double (nullable = true) |-- invalid_sd_credits: double (nullable = true) |-- undelivered_credits: double (nullable = true) |-- unknown_credits: double (nullable = true) Now I want to write this transformed stream to another Kafka topic. I have temporarily used a UDF that accepts all these columns as parameters and create a json string for adding a column "value" for writing to Kafka. Is there easier and cleaner way to do the same? Thanks, Pankaj
Re: how to add colum to dataframe
You may want to try using df2.na.fill(…) From: lk_sparkDate: Tuesday, 6 December 2016 at 3:05 PM To: "user.spark" Subject: how to add colum to dataframe hi,all: my spark version is 2.0 I have a parquet file with one colum name url type is string,I wang get substring from the url and add it to the datafram: val df = spark.read.parquet("/parquetdata/weixin/page/month=201607") val df2 = df.withColumn("pa_bid",when($"url".isNull,col("url").substr(3, 5))) df2.select("pa_bid","url").show +--++ |pa_bid| url| +--++ | null|http://mp.weixin| | null|http://mp.weixin| | null|http://mp.weixin| | null|http://mp.weixin| | null|http://mp.weixin| | null|http://mp.weixin| | null|http://mp.weixin| | null|http://mp.weixin| Why what I got is null? 2016-12-06 lk_spark
Re: Spark Streaming: java.lang.NoClassDefFoundError: org/apache/kafka/common/message/KafkaLZ4BlockOutputStream
Next thing you may want to check is if the jar has been provided to all the executors in your cluster. Most of the class not found errors got resolved for me after making required jars available in the SparkContext. Thanks. From: Ted Yu> Date: Saturday, 12 March 2016 at 7:17 AM To: Siva > Cc: spark users > Subject: Re: Spark Streaming: java.lang.NoClassDefFoundError: org/apache/kafka/common/message/KafkaLZ4BlockOutputStream KafkaLZ4BlockOutputStream is in kafka-clients jar : $ jar tvf kafka-clients-0.8.2.0.jar | grep KafkaLZ4BlockOutputStream 1609 Wed Jan 28 22:30:36 PST 2015 org/apache/kafka/common/message/KafkaLZ4BlockOutputStream$BD.class 2918 Wed Jan 28 22:30:36 PST 2015 org/apache/kafka/common/message/KafkaLZ4BlockOutputStream$FLG.class 4578 Wed Jan 28 22:30:36 PST 2015 org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.class Can you check whether kafka-clients jar was in the classpath of the container ? Thanks On Fri, Mar 11, 2016 at 5:00 PM, Siva > wrote: Hi Everyone, All of sudden we are encountering the below error from one of the spark consumer. It used to work before without any issues. When I restart the consumer with latest offsets, it is working fine for sometime (it executed few batches) and it fails again, this issue is intermittent. Did any one come across this issue? 16/03/11 19:44:50 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 1.0 (TID 3, ip-172-31-32-183.us-west-2.compute.internal): java.lang.NoClassDefFoundError: org/apache/kafka/common/message/KafkaLZ4BlockOutputStream at kafka.message.ByteBufferMessageSet$.decompress(ByteBufferMessageSet.scala:65) at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:179) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:192) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:146) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) at scala.collection.Iterator$$anon$1.hasNext(Iterator.scala:847) at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:615) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:160) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:199) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:56) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.message.KafkaLZ4BlockOutputStream at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 23 more Container id: container_1456361466298_0236_01_02 Exit code: 50 Stack trace: ExitCodeException exitCode=50: at org.apache.hadoop.util.Shell.runCommand(Shell.java:538) at org.apache.hadoop.util.Shell.run(Shell.java:455) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715) at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Container exited with a non-zero exit
Re: Question on take function - Spark Java API
Thanks Sonal.. I shall try doing that.. On 26-Aug-2015, at 1:05 pm, Sonal Goyal sonalgoy...@gmail.com wrote: You can try using wholeTextFile which will give you a pair rdd of fileName, content. flatMap through this and manipulate the content. Best Regards, Sonal Founder, Nube Technologies http://www.nubetech.co/ Check out Reifier at Spark Summit 2015 https://spark-summit.org/2015/events/real-time-fuzzy-matching-with-spark-and-elastic-search/ http://in.linkedin.com/in/sonalgoyal On Wed, Aug 26, 2015 at 8:25 AM, Pankaj Wahane pankaj.wah...@qiotec.com mailto:pankaj.wah...@qiotec.com wrote: Hi community members, Apache Spark is Fantastic and very easy to learn.. Awesome work!!! Question: I have multiple files in a folder and and the first line in each file is name of the asset that the file belongs to. Second line is csv header row and data starts from third row.. Ex: File 1 TestAsset01 Time,dp_1,dp_2,dp_3 11-01-2015 15:00:00,123,456,789 11-01-2015 15:00:01,123,456,789 . . . Ex: File 2 TestAsset02 Time,dp_1,dp_2,dp_3 11-01-2015 15:00:00,1230,4560,7890 11-01-2015 15:00:01,1230,4560,7890 . . . I have got nearly 1000 files in each folder sizing ~10G I am using apache spark Java api to read all this files. Following is code extract that I am using: try (JavaSparkContext sc = new JavaSparkContext(conf)) { MapString, String readingTypeMap = getReadingTypesMap(sc); //Read File JavaRDDString data = sc.textFile(resourceBundle.getString(FOLDER_NAME)); //Get Asset String asset = data.take(1).get(0); //Extract Time Series Data JavaRDDString actualData = data.filter(line - line.contains(DELIMERTER)); //Strip header String header = actualData.take(1).get(0); String[] headers = header.split(DELIMERTER); //Extract actual data JavaRDDString timeSeriesLines = actualData.filter(line - !line.equals(header)); //Extract valid records JavaRDDString validated = timeSeriesLines.filter(line - validate(line)); //Find Granularity Integer granularity = toInt(resourceBundle.getString(GRANULARITY)); //Transform to TSD objects JavaRDDTimeSeriesData tsdFlatMap = transformTotimeSeries(validated, asset, readingTypeMap, headers, granularity); //Save to Cassandra javaFunctions(tsdFlatMap).writerBuilder(resourceBundle.getString(cassandra.tsd.keyspace), time_series_data, mapToRow(TimeSeriesData.class)).saveToCassandra(); System.out.println(Total Records: + timeSeriesLines.count()); System.out.println(Valid Records: + validated.count()); } Within TimeSeriesData Object I need to set the asset name for the reading, so I need output of data.take(1) to be different for different files. Thank You. Best Regards, Pankaj QIO Technologies Limited is a limited company registered in England Wales at 1 Curzon Street, London, England, W1J 5HD, with registered number 09368431 This message and the information contained within it is intended solely for the addressee and may contain confidential or privileged information. If you have received this message in error please notify QIO Technologies Limited immediately and then permanently delete this message. If you are not the intended addressee then you must not copy, transmit, disclose or rely on the information contained in this message or in any attachment to it, all such use is prohibited to maximum extent possible by law. -- QIO Technologies Limited is a limited company registered in England Wales at 1 Curzon Street, London, England, W1J 5HD, with registered number 09368431 This message and the information contained within it is intended solely for the addressee and may contain confidential or privileged information. If you have received this message in error please notify QIO Technologies Limited immediately and then permanently delete this message. If you are not the intended addressee then you must not copy, transmit, disclose or rely on the information contained in this message or in any attachment to it, all such use is prohibited to maximum extent possible by law.
Question on take function - Spark Java API
Hi community members, Apache Spark is Fantastic and very easy to learn.. Awesome work!!! Question: I have multiple files in a folder and and the first line in each file is name of the asset that the file belongs to. Second line is csv header row and data starts from third row.. Ex: File 1 TestAsset01 Time,dp_1,dp_2,dp_3 11-01-2015 15:00:00,123,456,789 11-01-2015 15:00:01,123,456,789 . . . Ex: File 2 TestAsset02 Time,dp_1,dp_2,dp_3 11-01-2015 15:00:00,1230,4560,7890 11-01-2015 15:00:01,1230,4560,7890 . . . I have got nearly 1000 files in each folder sizing ~10G I am using apache spark Java api to read all this files. Following is code extract that I am using: try (JavaSparkContext sc = new JavaSparkContext(conf)) { MapString, String readingTypeMap = getReadingTypesMap(sc); //Read File JavaRDDString data = sc.textFile(resourceBundle.getString(FOLDER_NAME)); //Get Asset String asset = data.take(1).get(0); //Extract Time Series Data JavaRDDString actualData = data.filter(line - line.contains(DELIMERTER)); //Strip header String header = actualData.take(1).get(0); String[] headers = header.split(DELIMERTER); //Extract actual data JavaRDDString timeSeriesLines = actualData.filter(line - !line.equals(header)); //Extract valid records JavaRDDString validated = timeSeriesLines.filter(line - validate(line)); //Find Granularity Integer granularity = toInt(resourceBundle.getString(GRANULARITY)); //Transform to TSD objects JavaRDDTimeSeriesData tsdFlatMap = transformTotimeSeries(validated, asset, readingTypeMap, headers, granularity); //Save to Cassandra javaFunctions(tsdFlatMap).writerBuilder(resourceBundle.getString(cassandra.tsd.keyspace), time_series_data, mapToRow(TimeSeriesData.class)).saveToCassandra(); System.out.println(Total Records: + timeSeriesLines.count()); System.out.println(Valid Records: + validated.count()); } Within TimeSeriesData Object I need to set the asset name for the reading, so I need output of data.take(1) to be different for different files. Thank You. Best Regards, Pankaj -- QIO Technologies Limited is a limited company registered in England Wales at 1 Curzon Street, London, England, W1J 5HD, with registered number 09368431 This message and the information contained within it is intended solely for the addressee and may contain confidential or privileged information. If you have received this message in error please notify QIO Technologies Limited immediately and then permanently delete this message. If you are not the intended addressee then you must not copy, transmit, disclose or rely on the information contained in this message or in any attachment to it, all such use is prohibited to maximum extent possible by law.