Re: Spark Streaming fails with unable to get records after polling for 512 ms
Hi Cody, It worked, after moving the parameter to sparkConf. I don't see that error. But, Now i'm seeing the count for each RDD returns 0. But, there are records in the topic i'm reading. Do you see anything wrong with how i'm creating the Direct Stream ? Thanks Jagadish On Wed, Nov 15, 2017 at 11:23 AM, Cody Koeninger wrote: > spark.streaming.kafka.consumer.poll.ms is a spark configuration, not > a kafka parameter. > > see http://spark.apache.org/docs/latest/configuration.html > > On Tue, Nov 14, 2017 at 8:56 PM, jkagitala wrote: > > Hi, > > > > I'm trying to add spark-streaming to our kafka topic. But, I keep getting > > this error > > java.lang.AssertionError: assertion failed: Failed to get record after > > polling for 512 ms. > > > > I tried to add different params like max.poll.interval.ms, > > spark.streaming.kafka.consumer.poll.ms to 1ms in kafkaParams. > > But, i still get failed to get records after 512ms. Not sure, even adding > > the above params doesn't change the polling time. > > > > Without spark-streaming, i'm able to fetch the records. Only with > > spark-streaming addon, i get this error. > > > > Any help is greatly appreciated. Below, is the code i'm using. > > > > SparkConf sparkConf = new > > SparkConf().setAppName("JavaFlingerSparkApplication"). > setMaster("local[*]"); > > JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, > > Durations.seconds(10)); > > > > kafkaParams.put("bootstrap.servers", hosts); > > kafkaParams.put("group.id", groupid); > > kafkaParams.put("auto.commit.enable", false); > > kafkaParams.put("key.deserializer", StringDeserializer.class); > > kafkaParams.put("value.deserializer", BytesDeserializer.class); > > kafkaParams.put("auto.offset.reset", "earliest"); > > //kafkaParams.put("max.poll.interval.ms", 12000); > > //kafkaParams.put("spark.streaming.kafka.consumer.poll.ms", 12000); > > //kafkaParams.put("request.timeout.ms", 12000); > > > > > > JavaInputDStream>> messages = > > KafkaUtils.createDirectStream(ssc, > > LocationStrategies.PreferConsistent(), > > > > ConsumerStrategies.Subscribe(topics, kafkaParams)); > > messages.foreachRDD(rdd -> { > > List>> input = > rdd.collect(); > > System.out.println("count is"+input.size()); > > }); > > ssc.start(); > > ssc.awaitTermination(); > > > > Thanks > > Jagadish > > > > > > > > -- > > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > > > - > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > >
Access to Applications metrics
Hello, I'm wondering if it's possible to get access to the detailed job/stage/task level metrics via the metrics system (JMX, Graphite, &c). I've enabled the wildcard sink and I do not see them. It seems these values are only available over http/json and to SparkListener instances, is this the case? Has anyone worked on a SparkListener that would bridge data from one to the other? Thanks, Nick
[Spark Core]: S3a with Openstack swift object storage not using credentials provided in sparkConf
Hey, i am currently using Spark 2.2.0 for Hadoop 2.7.x in in a Standalone cluster for testing. I want to Access some files to share them one the nodes on the cluster using addFiles. As local directories are not supported for this i want to use s3 to do the job. In contrast to nearly everything i have found on the internet i am using a self hosted openstack cluster using swift as object storage. Accessing swift directly would be fine, too, but all tutorials i have found seem to use keystone v2, whilst our deployment uses the v3 version. I added the following jars: aws-java-sdk-1.7.4.jar hadoop-aws-2.7.3.jar as jars and to the classpath of each executor and driver. When i try to access an s3 bucket the following exception occurs: "Unable to load AWS credentials from any provider in the chain" This is my config: conf.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") conf.set("fs.s3a.endpoint", "https://foo/swift/v1";) conf.set("fs.s3a.access.key", System.getenv("s3Access")) conf.set("fs.s3a.secret.key", System.getenv("s3Secret")) From my understanding the s3 handler is not using the provided credentials. Has anyone an idea how to fix this? Cheers and thanks in Advance Marius
Parquet files from spark not readable in Cascading
Hi, When I tried reading parquet data that was generated by spark in cascading it throws following error Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file "" at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:228) at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201) at org.apache.parquet.hadoop.mapred.DeprecatedParquetInputFormat$RecordReaderWrapper.(DeprecatedParquetInputFormat.java:103) at org.apache.parquet.hadoop.mapred.DeprecatedParquetInputFormat.getRecordReader(DeprecatedParquetInputFormat.java:47) at cascading.tap.hadoop.io .MultiInputFormat$1.operate(MultiInputFormat.java:253) at cascading.tap.hadoop.io .MultiInputFormat$1.operate(MultiInputFormat.java:248) at cascading.util.Util.retry(Util.java:1044) at cascading.tap.hadoop.io .MultiInputFormat.getRecordReader(MultiInputFormat.java:247) at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:394) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:332) at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:268) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ArrayIndexOutOfBoundsException: -1 at java.util.ArrayList.elementData(ArrayList.java:418) at java.util.ArrayList.get(ArrayList.java:431) at org.apache.parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:98) at org.apache.parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:98) at org.apache.parquet.io .PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:83) at org.apache.parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:77) at org.apache.parquet.io .RecordReaderImplementation.(RecordReaderImplementation.java:293) at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:134) at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:99) at org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:154) at org.apache.parquet.io .MessageColumnIO.getRecordReader(MessageColumnIO.java:99) at org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137) at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:208) This is mostly seen when parquet has nested structures. I didnt find any solution to this. I see some JIRA issues like this https://issues.apache.org/jira/browse/SPARK-10434 (parquet compatability /interoperabilityissues) where reading parquet files in Spark 1.4 where the files were generated by Spark 1.5 .This was fixed in later versions but was it fixed in Cascading? Not sure if this is something to do with Parquet version or Cascading has a bug or Spark is doing something with Parquet files which cascading is not accepting Note : I am trying to read Parquet with avro schema in Cascading I have posted in Cascading mailing list too. -- Thanks Vikas Gandham
Re: Spark Streaming fails with unable to get records after polling for 512 ms
spark.streaming.kafka.consumer.poll.ms is a spark configuration, not a kafka parameter. see http://spark.apache.org/docs/latest/configuration.html On Tue, Nov 14, 2017 at 8:56 PM, jkagitala wrote: > Hi, > > I'm trying to add spark-streaming to our kafka topic. But, I keep getting > this error > java.lang.AssertionError: assertion failed: Failed to get record after > polling for 512 ms. > > I tried to add different params like max.poll.interval.ms, > spark.streaming.kafka.consumer.poll.ms to 1ms in kafkaParams. > But, i still get failed to get records after 512ms. Not sure, even adding > the above params doesn't change the polling time. > > Without spark-streaming, i'm able to fetch the records. Only with > spark-streaming addon, i get this error. > > Any help is greatly appreciated. Below, is the code i'm using. > > SparkConf sparkConf = new > SparkConf().setAppName("JavaFlingerSparkApplication").setMaster("local[*]"); > JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, > Durations.seconds(10)); > > kafkaParams.put("bootstrap.servers", hosts); > kafkaParams.put("group.id", groupid); > kafkaParams.put("auto.commit.enable", false); > kafkaParams.put("key.deserializer", StringDeserializer.class); > kafkaParams.put("value.deserializer", BytesDeserializer.class); > kafkaParams.put("auto.offset.reset", "earliest"); > //kafkaParams.put("max.poll.interval.ms", 12000); > //kafkaParams.put("spark.streaming.kafka.consumer.poll.ms", 12000); > //kafkaParams.put("request.timeout.ms", 12000); > > > JavaInputDStream>> messages = > KafkaUtils.createDirectStream(ssc, > LocationStrategies.PreferConsistent(), > > ConsumerStrategies.Subscribe(topics, kafkaParams)); > messages.foreachRDD(rdd -> { > List>> input = > rdd.collect(); > System.out.println("count is"+input.size()); > }); > ssc.start(); > ssc.awaitTermination(); > > Thanks > Jagadish > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Process large JSON file without causing OOM
Thanks Steve and Vadim for the feedback. @Steve, are you suggesting creating a custom receiver and somehow piping it through Spark Streaming/Spark SQL? Or are you suggesting creating smaller datasets from the stream and using my original code to process smaller datasets? It'd be very helpful for a novice, like myself, if you could provide code samples or links to docs/articles. @Vadim, I ran my test with local[1] and got OOM in the same place. What puzzles me is that when I expect the heap dump with VisualVM (see below) it says that the heap is pretty small ~35MB. I am running my test with "-Xmx10G -Dspark.executor.memory=6g -Dspark.driver.memory=6g" JVM opts and I can see them reflected in Spark UI. Am I missing some memory settings? Date taken: Wed Nov 15 10:46:06 MST 2017 File: /tmp/java_pid69786.hprof File size: 59.5 MB Total bytes: 39,728,337 Total classes: 15,749 Total instances: 437,979 Classloaders: 123 GC roots: 2,831 Number of objects pending for finalization: 5,198 Thanks, Alec On Wed, Nov 15, 2017 at 11:15 AM, Vadim Semenov wrote: > There's a lot of off-heap memory involved in decompressing Snappy, > compressing ZLib. > > Since you're running using `local[*]`, you process multiple tasks > simultaneously, so they all might consume memory. > > I don't think that increasing heap will help, since it looks like you're > hitting system memory limits. > > I'd suggest trying to run with `local[2]` and checking what's the memory > usage of the jvm process. > > On Mon, Nov 13, 2017 at 7:22 PM, Alec Swan wrote: > >> Hello, >> >> I am using the Spark library to convert JSON/Snappy files to ORC/ZLIB >> format. Effectively, my Java service starts up an embedded Spark cluster >> (master=local[*]) and uses Spark SQL to convert JSON to ORC. However, I >> keep getting OOM errors with large (~1GB) files. >> >> I've tried different ways to reduce memory usage, e.g. by partitioning >> data with dataSet.partitionBy("customer).save(filePath), or capping >> memory usage by setting spark.executor.memory=1G, but to no vail. >> >> I am wondering if there is a way to avoid OOM besides splitting the >> source JSON file into multiple smaller ones and processing the small ones >> individually? Does Spark SQL have to read the JSON/Snappy (row-based) file >> in it's entirety before converting it to ORC (columnar)? If so, would it >> make sense to create a custom receiver that reads the Snappy file and use >> Spark streaming for ORC conversion? >> >> Thanks, >> >> Alec >> > >
Restart Spark Streaming after deployment
Hi, I am new in the usage of spark streaming. I have developed one spark streaming job which runs every 30 minutes with checkpointing directory. I have to implement minor change, shall I kill the spark streaming job once the batch is completed using yarn application -kill command and update the jar file? Question I have is, if I follow the above approach will spark streaming picks up data from offset saved in checkpoint after restart? is there any other better approaches you have. Thanks in advance for your suggestions. Thanks, Asmath
Re: Process large JSON file without causing OOM
There's a lot of off-heap memory involved in decompressing Snappy, compressing ZLib. Since you're running using `local[*]`, you process multiple tasks simultaneously, so they all might consume memory. I don't think that increasing heap will help, since it looks like you're hitting system memory limits. I'd suggest trying to run with `local[2]` and checking what's the memory usage of the jvm process. On Mon, Nov 13, 2017 at 7:22 PM, Alec Swan wrote: > Hello, > > I am using the Spark library to convert JSON/Snappy files to ORC/ZLIB > format. Effectively, my Java service starts up an embedded Spark cluster > (master=local[*]) and uses Spark SQL to convert JSON to ORC. However, I > keep getting OOM errors with large (~1GB) files. > > I've tried different ways to reduce memory usage, e.g. by partitioning > data with dataSet.partitionBy("customer).save(filePath), or capping > memory usage by setting spark.executor.memory=1G, but to no vail. > > I am wondering if there is a way to avoid OOM besides splitting the source > JSON file into multiple smaller ones and processing the small ones > individually? Does Spark SQL have to read the JSON/Snappy (row-based) file > in it's entirety before converting it to ORC (columnar)? If so, would it > make sense to create a custom receiver that reads the Snappy file and use > Spark streaming for ORC conversion? > > Thanks, > > Alec >
Re: Process large JSON file without causing OOM
On 14 Nov 2017, at 15:32, Alec Swan mailto:alecs...@gmail.com>> wrote: But I wonder if there is a way to stream/batch the content of JSON file in order to convert it to ORC piecemeal and avoid reading the whole JSON file in memory in the first place? That is what you'll need to do; you'd hit similar problems if you had the same files, same allocated JVM space and the same # of threads trying to read in the files. Jackson has a streaming API: http://www.baeldung.com/jackson-streaming-api
spark strucured csv file stream not detecting new files
Greetings, I am running a unit test designed to stream a folder where I am manually copying csv files. The files do not always get picked up. They only get detected when the job starts with the files already in the folder. I even tried using the option of fileNameOnly newly included in 2.2.0. Have I missed something in the documentation. This problem does not seem to occur in DStreams examples DataStreamReader reader = spark.readStream().option("fileNameOnly", true).option("header",true) .schema(userSchema); ; DatasetcsvDF= reader.csv(watchDir) Dataset results = csvDF.groupBy("myCol").count(); MyForEach forEachObj=new MyForEach(); query = results .writeStream() .foreach(forEachObj) // for each never gets called .outputMode("complete") .start(); -- I.R