get partition locations in spark

2017-05-25 Thread girish hilage
Hi, In order to get the preferred locations for partitions I executed below statement, r1.preferredLocations(part);but it returned an empty List(). How can I print the hostnames the partition is likely on? Regards,Girish

Re: Running into the same problem as JIRA SPARK-19268

2017-05-25 Thread Shixiong(Ryan) Zhu
I don't know what happened in your case so cannot provide any work around. It would be great if you can provide logs output by HDFSBackedStateStoreProvider. On Thu, May 25, 2017 at 4:05 PM, kant kodali wrote: > > On Thu, May 25, 2017 at 3:41 PM, Shixiong(Ryan) Zhu < >

Re: Running into the same problem as JIRA SPARK-19268

2017-05-25 Thread kant kodali
On Thu, May 25, 2017 at 3:41 PM, Shixiong(Ryan) Zhu wrote: > bin/hadoop fs -ls /usr/local/hadoop/checkpoint/state/0/* > Hi, There are no files under bin/hadoop fs -ls /usr/local/hadoop/checkpoint/state/0/* but all the directories until

Re: Running into the same problem as JIRA SPARK-19268

2017-05-25 Thread Shixiong(Ryan) Zhu
Feel free to create a new ticket. Could you also provide the files in "/usr/local/hadoop/checkpoint/state/0" (Just run "bin/hadoop fs -ls /usr/local/hadoop/checkpoint/state/0/*") in the ticket and the Spark logs? On Thu, May 25, 2017 at 2:53 PM, kant kodali wrote: > Should I

shuffle write is very slow

2017-05-25 Thread KhajaAsmath Mohammed
Hi, I am converting hive job with spark job. I have tested on small set and logic is correct in hive and spark. when i started testing on large data, spark is very slow when compared to hive. shuffle write is taking long time. any suggestions? I am creating temporary table in spark and

Re: Running into the same problem as JIRA SPARK-19268

2017-05-25 Thread kant kodali
Should I file a ticket or should I try another version like Spark 2.2 since I am currently using 2.1.1? On Thu, May 25, 2017 at 2:38 PM, kant kodali wrote: > Hi Ryan, > > You are right I was setting checkpointLocation for readStream. Now I did > set if for writeStream as

Re: Running into the same problem as JIRA SPARK-19268

2017-05-25 Thread kant kodali
Hi Ryan, You are right I was setting checkpointLocation for readStream. Now I did set if for writeStream as well like below StreamingQuery query = df2.writeStream().foreach(new KafkaSink()).option( "checkpointLocation","/usr/local/hadoop/checkpoint").outputMode("update" ).start();

Re: Running into the same problem as JIRA SPARK-19268

2017-05-25 Thread Shixiong(Ryan) Zhu
Read your codes again and found one issue: you set "checkpointLocation" in `readStream`. It should be set in `writeStream`. However, I still have no idea why use a temp checkpoint location will fail. On Thu, May 25, 2017 at 2:23 PM, kant kodali wrote: > I did the following >

Re: Running into the same problem as JIRA SPARK-19268

2017-05-25 Thread kant kodali
I did the following *bin/hadoop fs -mkdir -p **/usr/local/hadoop/checkpoint* and did *bin/hadoop fs -ls / * and I can actually see */tmp* and */usr* and inside of */usr *there is indeed *local/hadoop/checkpoint. * So until here it looks fine. I also cleared everything */tmp/** as @Michael

Re: Structured Streaming from Parquet

2017-05-25 Thread upendra 1991
Paul, Did you try, writing to disk rather than in memory. When files are large depending upon which one of quality (performance)/quantity  You want to have, writing to disk would get the load of executors down and will pass to stage where format your data in app2. Other options are to use Kafka

Re: Questions regarding Jobs, Stages and Caching

2017-05-25 Thread Ram Navan
Thank You Stephen and Nicholas. I specified the schema to spark.read.json() and the time to execute this instruction got reduced to 4 minutes from original 8 minutes! I also see only two jobs (instead of three when calling with no schema) created. Please refer to attachment job0 and job2 from

Re: Structured Streaming from Parquet

2017-05-25 Thread Burak Yavuz
Hi Paul, >From what you're describing, it seems that stream1 is possibly generating tons of small files and stream2 is OOMing because it tries to maintain an in-memory list of files. Some notes/questions: 1. Parquet files are splittable, therefore having large parquet files shouldn't be a

Re: Questions regarding Jobs, Stages and Caching

2017-05-25 Thread Nicholas Hakobian
If you do not specify a schema, then the json() function will attempt to determine the schema, which requires a full scan of the file. Any subsequent actions will again have to read in the data. See the documentation at:

Re: Questions regarding Jobs, Stages and Caching

2017-05-25 Thread Steffen Schmitz
Hi Ram, spark.read.json() should be evaluated on the first the call of .count(). It should then be read into memory once and the rows are counted. After this operation it will be in memory and access will be faster. If you add println statements in between of your function calls you should see

Re: Questions regarding Jobs, Stages and Caching

2017-05-25 Thread Ram Navan
Hi Steffen, Thanks for your response. Isn't spark.read.json() an action function? It reads the files from the source directory, infers the schema and creates a dataframe right? dataframe.cache() prints out this schema as well. I am not sure why dataframe.count() will try to do the same thing

access error while trying to run distcp from source cluster

2017-05-25 Thread nancy henry
Hi Team, I am trying to copy data from A cluster to B cluster and same user for both I am running distcp command on source cluster A but i am getting error 17/05/25 07:24:08 INFO mapreduce.Job: Running job: job_1492549627402_344485 17/05/25 07:24:17 INFO mapreduce.Job: Job

Structured Streaming from Parquet

2017-05-25 Thread Paul Corley
I have a Spark Structured Streaming process that is implemented in 2 separate streaming apps. First App reads .gz, which range in size from 1GB to 9GB compressed, files in from s3 filters out invalid records and repartitions the data and outputs to parquet on s3 partitioned the same as the

user-unsubscr...@spark.apache.org

2017-05-25 Thread williamtellme123
From: Steffen Schmitz [mailto:steffenschm...@hotmail.de] Sent: Thursday, May 25, 2017 3:34 AM To: ramnavan Cc: user@spark.apache.org Subject: Re: Questions regarding Jobs, Stages and Caching

unsubscribe

2017-05-25 Thread 信息安全部
unsubscribe

RE: strange warning

2017-05-25 Thread Mendelson, Assaf
Some more info: It seems this is caused due to complex data structure. Consider the following simple example: case class A(v: Int) case class B(v: A) val filename = "test" val a = A(1) val b = B(a) val df1: DataFrame = Seq[B](b).toDF df1.write.parquet(filename) val df2 =

Re: Questions regarding Jobs, Stages and Caching

2017-05-25 Thread Steffen Schmitz
Hi Ram, Regarding your caching question: The data frame is evaluated lazy. That means it isn’t cached directly on invoking of .cache(), but on calling the first action on it (in your case count). Then it is loaded into memory and the rows are counted, not on the call of .cache(). On the

Re: Running into the same problem as JIRA SPARK-19268

2017-05-25 Thread kant kodali
Executing this bin/hadoop fs -ls /usr/local/hadoop/checkpoint says ls: `/usr/local/hadoop/checkpoint': No such file or directory This is what I expected as well since I don't see any checkpoint directory under /usr/local/hadoop. Am I missing any configuration variable like HADOOP_CONF_DIR ? I am

Re: Running into the same problem as JIRA SPARK-19268

2017-05-25 Thread kant kodali
Hi Ryan, I did add that print statement and here is what I got. class org.apache.hadoop.hdfs.DistributedFileSystem Thanks! On Wed, May 24, 2017 at 11:39 PM, Shixiong(Ryan) Zhu < shixi...@databricks.com> wrote: > I meant using HDFS command to check the directory. Such as "bin/hadoop fs > -ls

Re: Sharing my DataFrame (DataSet) cheat sheet.

2017-05-25 Thread Yan Facai
Thanks, Yuhao. Similarly, I write a 10-minuters-to-spark-dataframe to share the code snippets collected by myself. + https://github.com/facaiy/Spark-for-the-Impatient/blob/master/doc/10_minuters_to_spark_dataframe.md + https://facaiy.github.io/misc/2017/05/24/collection-of-spark-doc.html I hope

strange warning

2017-05-25 Thread Mendelson, Assaf
Hi all, Today, I got the following warning: [WARN] org.apache.parquet.hadoop.ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl This occurs on one of my tests but not on

Re: Running into the same problem as JIRA SPARK-19268

2017-05-25 Thread Shixiong(Ryan) Zhu
I meant using HDFS command to check the directory. Such as "bin/hadoop fs -ls /usr/local/hadoop/checkpoint". My hunch is the default file system in driver probably is the local file system. Could you add the following line into your code to print the default file system?

One question / kerberos, yarn-cluster -> connection to hbase

2017-05-25 Thread sudhir37
Facing one issue with Kerberos enabled Hadoop/CDH cluster. We are trying to run a streaming job on yarn-cluster, which interacts with Kafka (direct stream), and hbase. Somehow, we are not able to connect to hbase in the cluster mode. We use keytab to login to hbase. This is what we do: