Request for Assistance: Adding User Authentication to Apache Spark Application

2024-05-16 Thread NIKHIL RAJ SHRIVASTAVA
Dear Team,

I hope this email finds you well. My name is Nikhil Raj, and I am currently
working with Apache Spark for one of my projects , where through the help
of a parquet file we are creating an external table in Spark.

I am reaching out to seek assistance regarding user authentication for our
Apache Spark application. Currently, we can connect to the application
using only the host and port information. However, for security reasons, we
would like to implement user authentication to control access and ensure
data integrity.

After reviewing the available documentation and resources, I found that
adding user authentication to our Spark setup requires additional
configurations or plugins. However, I'm facing challenges in understanding
the exact steps or best practices to implement this.

Could you please provide guidance or point me towards relevant
documentation/resources that detail how to integrate user authentication
into Apache Spark?  Additionally, if there are any recommended practices or
considerations for ensuring the security of our Spark setup, we would
greatly appreciate your insights on that as well.

Your assistance in this matter would be invaluable to us, as we aim to
enhance the security of our Spark application and safeguard our data
effectively.

Thank you very much for your time and consideration. I look forward to
hearing from you and your suggestions.

Warm regards,

NIKHIL RAJ
Developer
Estuate Software Pvt. Ltd.
Thanks & Regards


S3 committer for dynamic partitioning

2024-03-05 Thread Nikhil Goyal
Hi folks,
We have been following this doc
<https://spark.apache.org/docs/3.5.1/cloud-integration.html#hadoop-s3a-committers>
for writing data from Spark Job to S3. However it fails writing to dynamic
partitions. Any suggestions on what config should be used to avoid the cost
of renaming in S3?

Thanks
Nikhil


Re: Architecture of Spark Connect

2023-12-14 Thread Nikhil Goyal
If multiple applications are running, we would need multiple spark connect
servers? If so, is the user responsible for creating these servers or they
are just created on the fly when the user requests a new spark session?

On Thu, Dec 14, 2023 at 10:28 AM Nikhil Goyal  wrote:

> Hi folks,
> I am trying to understand one question. Does Spark Connect create a new
> driver in the backend for every user or there are a fixed number of drivers
> running to which requests are sent to?
>
> Thanks
> Nikhil
>


Architecture of Spark Connect

2023-12-14 Thread Nikhil Goyal
Hi folks,
I am trying to understand one question. Does Spark Connect create a new
driver in the backend for every user or there are a fixed number of drivers
running to which requests are sent to?

Thanks
Nikhil


Shuffle data on pods which get decomissioned

2023-06-20 Thread Nikhil Goyal
Hi folks,
When running Spark on K8s, what would happen to shuffle data if an executor
is terminated or lost. Since there is no shuffle service, does all the work
done by that executor gets recomputed?

Thanks
Nikhil


Viewing UI for spark jobs running on K8s

2023-05-31 Thread Nikhil Goyal
Hi folks,
Is there an equivalent of the Yarn RM page for Spark on Kubernetes. We can
port-forward the UI from the driver pod for each but this process is
tedious given we have multiple jobs running. Is there a clever solution to
exposing all Driver UIs in a centralized place?

Thanks
Nikhil


Re: Partition by on dataframe causing a Sort

2023-04-20 Thread Nikhil Goyal
Is it possible to use MultipleOutputs and define a custom OutputFormat and
then use `saveAsHadoopFile` to be able to achieve this?

On Thu, Apr 20, 2023 at 1:29 PM Nikhil Goyal  wrote:

> Hi folks,
>
> We are writing a dataframe and doing a partitionby() on it.
> df.write.partitionBy('col').parquet('output')
>
> Job is running super slow because internally per partition it is doing a
> sort before starting to output to the final location. This sort isn't
> useful in any way since # of files will remain the same. I was wondering if
> we can have spark just open multiple file pointers and keep appending data
> as it receives and close all the pointers when it's done. This will reduce
> the memory footprint and will speed up the performance as we will
> eliminate a sort. We can implement a custom source but unable to see if we
> can really control this behavior in the sink. If anyone has any suggestions
> please let me know.
>
> Thanks
> Nikhil
>


Partition by on dataframe causing a Sort

2023-04-20 Thread Nikhil Goyal
Hi folks,

We are writing a dataframe and doing a partitionby() on it.
df.write.partitionBy('col').parquet('output')

Job is running super slow because internally per partition it is doing a
sort before starting to output to the final location. This sort isn't
useful in any way since # of files will remain the same. I was wondering if
we can have spark just open multiple file pointers and keep appending data
as it receives and close all the pointers when it's done. This will reduce
the memory footprint and will speed up the performance as we will
eliminate a sort. We can implement a custom source but unable to see if we
can really control this behavior in the sink. If anyone has any suggestions
please let me know.

Thanks
Nikhil


Understanding executor memory behavior

2023-03-16 Thread Nikhil Goyal
Hi folks,
I am trying to understand what would be the difference in running 8G 1 core
executor vs 40G 5 core executors. I see that on yarn it can cause bin
fitting issues but other than that are there any pros and cons on using
either?

Thanks
Nikhil


Increasing Spark history resources

2022-12-08 Thread Nikhil Goyal
Hi folks,
We are experiencing slowness in Spark history server, hence trying to find
what config properties we can tune to fix the issue. I found that
SPARK_DAEMON_MEMORY is used to control memory, similarly is there a config
property to increase the number of threads?

Thanks
Nikhil


Driver takes long time to finish once job ends

2022-11-22 Thread Nikhil Goyal
Hi folks,
We are running a job on our on prem cluster on K8s but writing the output
to S3. We noticed that all the executors finish in < 1h but the driver
takes another 5h to finish. Logs:

22/11/22 02:08:29 INFO BlockManagerInfo: Removed broadcast_3_piece0 on
10.42.145.11:39001 in memory (size: 7.3 KiB, free: 9.4 GiB)
22/11/22 *02:08:29* INFO BlockManagerInfo: Removed broadcast_3_piece0
on 10.42.137.10:33425 in memory (size: 7.3 KiB, free: 9.4 GiB)
22/11/22 *04:57:46* INFO FileFormatWriter: Write Job
4f0051fc-dda9-457f-a072-26311fd5e132 committed.
22/11/22 04:57:46 INFO FileFormatWriter: Finished processing stats for
write job 4f0051fc-dda9-457f-a072-26311fd5e132.
22/11/22 04:57:47 INFO FileUtils: Creating directory if it doesn't
exist: 
s3://rbx.usr/masked/dw_pii/creator_analytics_user_universe_first_playsession_dc_ngoyal/ds=2022-10-21
22/11/22 04:57:48 INFO SessionState: Could not get hdfsEncryptionShim,
it is only applicable to hdfs filesystem.
22/11/22 *04:57:48* INFO SessionState: Could not get
hdfsEncryptionShim, it is only applicable to hdfs filesystem.
22/11/22 *07:20:20* WARN ExecutorPodsWatchSnapshotSource: Kubernetes
client has been closed (this is expected if the application is
shutting down.)
22/11/22 07:20:22 INFO MapOutputTrackerMasterEndpoint:
MapOutputTrackerMasterEndpoint stopped!
22/11/22 07:20:22 INFO MemoryStore: MemoryStore cleared
22/11/22 07:20:22 INFO BlockManager: BlockManager stopped
22/11/22 07:20:22 INFO BlockManagerMaster: BlockManagerMaster stopped
22/11/22 07:20:22 INFO
OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:
OutputCommitCoordinator stopped!
22/11/22 07:20:22 INFO SparkContext: Successfully stopped SparkContext
22/11/22 07:20:22 INFO ShutdownHookManager: Shutdown hook called
22/11/22 07:20:22 INFO ShutdownHookManager: Deleting directory
/tmp/spark-d9aa302f-86f2-4668-9c01-07b3e71cba82
22/11/22 07:20:22 INFO ShutdownHookManager: Deleting directory
/var/data/spark-5295849e-a0f3-4355-9a6a-b510616aefaa/spark-43772336-8c86-4e2b-839e-97b2442b2959
22/11/22 07:20:22 INFO MetricsSystemImpl: Stopping s3a-file-system
metrics system...
22/11/22 07:20:22 INFO MetricsSystemImpl: s3a-file-system metrics
system stopped.
22/11/22 07:20:22 INFO MetricsSystemImpl: s3a-file-system metrics
system shutdown complete.

Seems like the job is taking time to write to S3. Any idea how to fix
this issue?

Thanks


Dynamic allocation on K8

2022-10-25 Thread Nikhil Goyal
Hi folks,
When running spark on Kubernetes is it possible to use dynamic allocation?
Some blog posts
<https://spot.io/blog/setting-up-managing-monitoring-spark-on-kubernetes/>
mentioned that dynamic allocation is available, however I am not sure how
it works. Spark official docs
<https://spark.apache.org/docs/latest/running-on-kubernetes.html#future-work>
say that shuffle service is not yet available.

Thanks
Nikhil


partitionBy creating lot of small files

2022-06-04 Thread Nikhil Goyal
Hi all,

Is there a way to use dataframe.partitionBy("col") and control the number
of output files without doing a full repartition? The thing is some
partitions have more data while some have less. Doing a .repartition is a
costly operation. We want to control the size of the output files. Is it
even possible?

Thanks


Re: PartitionBy and SortWithinPartitions

2022-06-03 Thread Nikhil Goyal
Hi Enrico,
Thanks for replying. I want to partition by a column and then be able to
sort within those partitions based on another column. DataframeWriter has
sortBy and bucketBy but it requires creating a new table (Can only use
`saveAsTable` but not just `save`). I can write another job on top which
does the sorting but that complicates the code. So is there a clever way to
sort records after they have been partitioned?

Thanks
Nikhil

On Fri, Jun 3, 2022 at 9:38 AM Enrico Minack  wrote:

> Nikhil,
>
> What are you trying to achieve with this in the first place? What are your
> goals? What is the problem with your approach?
>
> Are you concerned about the 1000 files in each written col2-partition?
>
> The write.partitionBy is something different that df.repartition or
> df.coalesce.
>
> The df partitions are sorted *before* partitionBy-writing them.
>
> Enrico
>
>
> Am 03.06.22 um 16:13 schrieb Nikhil Goyal:
>
> Hi folks,
>
> We are trying to do
> `
> df.coalesce(1000).sortWithinPartitions("col1").write.mode('overwrite').partitionBy("col2").parquet(...)
> `
>
> I do see that coalesce 1000 is applied for every sub partition. But I
> wanted to know if sortWithinPartitions(col1) works after applying
> partitionBy or before? Basically would spark first partitionBy col2 and
> then sort by col1 or sort first and then partition?
>
> Thanks
> Nikhil
>
>
>


PartitionBy and SortWithinPartitions

2022-06-03 Thread Nikhil Goyal
Hi folks,

We are trying to do
`
df.coalesce(1000).sortWithinPartitions("col1").write.mode('overwrite').partitionBy("col2").parquet(...)
`

I do see that coalesce 1000 is applied for every sub partition. But I
wanted to know if sortWithinPartitions(col1) works after applying
partitionBy or before? Basically would spark first partitionBy col2 and
then sort by col1 or sort first and then partition?

Thanks
Nikhil


Serialization error when using scala kernel with Jupyter

2020-02-21 Thread Nikhil Goyal
Hi all,
I am trying to use almond scala kernel to run spark session on Jupyter. I
am using scala version 2.12.8. I am creating spark session with master set
to Yarn.
This is the code:

val rdd = spark.sparkContext.parallelize(Seq(1, 2, 4))
rdd.map(x => x + 1).collect()

Exception:

java.lang.ClassCastException: cannot assign instance of
java.lang.invoke.SerializedLambda to field
org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in
instance of org.apache.spark.rdd.MapPartitionsRDD


I was wondering if anyone has seen this before.

Thanks
Nikhil


Understanding deploy mode config

2019-10-02 Thread Nikhil Goyal
Hi all,

In a pyspark application is the python process the driver or spark will
start a new driver process? If it is the same as driver then how does
specifying "spark.submit.deployMode" as "cluster" in  spark conf would come
in use.

conf = SparkConf()
.setMaster("yarn")
.set("spark.submit.deployMode", "cluster")
sc = SparkContext(conf)

Is the spark context being created on application master or on the machine
where this python process is being run?

Thanks
Nikhil


unsubcribe

2019-05-24 Thread Nikhil R Patil
unsubcribe
"Confidentiality Warning: This message and any attachments are intended only 
for the use of the intended recipient(s). 
are confidential and may be privileged. If you are not the intended recipient. 
you are hereby notified that any 
review. re-transmission. conversion to hard copy. copying. circulation or other 
use of this message and any attachments is 
strictly prohibited. If you are not the intended recipient. please notify the 
sender immediately by return email. 
and delete this message and any attachments from your system.

Virus Warning: Although the company has taken reasonable precautions to ensure 
no viruses are present in this email. 
The company cannot accept responsibility for any loss or damage arising from 
the use of this email or attachment."


Re: K8s-Spark client mode : Executor image not able to download application jar from driver

2019-04-28 Thread Nikhil Chinnapa
Thanks for explaining in such detail and pointing to the source code.
Yes, its helpful and cleared lot of confusions. 



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: K8s-Spark client mode : Executor image not able to download application jar from driver

2019-04-27 Thread Nikhil Chinnapa
Hi Stavros,

Thanks a lot for pointing in right direction. I got stuck in some release,
so didn’t got time earlier.

The mistake was “LINUX_APP_RESOURCE” : I was using “local” instead it should
be “file”. I reached above due to your email only.

What I understood:
Driver image :  $SPARK_HOME/bin and $SPARK_HOME/jars and application jar.
Executor Image : just $SPARK_HOME/bin and $SPARK_HOME/jars folder will
suffice.




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



K8s-Spark client mode : Executor image not able to download application jar from driver

2019-04-16 Thread Nikhil Chinnapa
Environment:
Spark: 2.4.0
Kubernetes:1.14

Query: Does application jar needs to be part of both Driver and Executor
image?

Invocation point (from Java code):
sparkLaunch = new SparkLauncher()

.setMaster(LINUX_MASTER)
.setAppResource(LINUX_APP_RESOURCE)
.setConf("spark.app.name",APP_NAME)
.setMainClass(MAIN_CLASS)

.setConf("spark.executor.instances",EXECUTOR_COUNT)

.setConf("spark.kubernetes.container.image",CONTAINER_IMAGE)

.setConf("spark.kubernetes.driver.pod.name",DRIVER_POD_NAME)
   
.setConf("spark.kubernetes.container.image.pullSecrets",REGISTRY_SECRET)
   
.setConf("spark.kubernetes.authenticate.driver.serviceAccountName",SERVICE_ACCOUNT_NAME)
.setConf("spark.driver.host", SERVICE_NAME + 
"." + NAMESPACE +
".svc.cluster.local")
.setConf("spark.driver.port", 
DRIVER_PORT)
.setDeployMode("client")
;

Scenario:
I am trying to run Spark on K8s in client mode. When I put application jar
image both in driver and executor then program work fines.

But, if I put application jar in driver image only then I get following
error:

2019-04-16 06:36:44 INFO  Executor:54 - Fetching
file:/opt/spark/examples/jars/reno-spark-codebase-0.1.0.jar with timestamp
1555396592768
2019-04-16 06:36:44 INFO  Utils:54 - Copying
/opt/spark/examples/jars/reno-spark-codebase-0.1.0.jar to
/var/data/spark-d24c8fbc-4fe7-4968-9310-f891a097d1e7/spark-31ba5cbb-3132-408c-991a-795
2019-04-16 06:36:44 ERROR Executor:91 - Exception in task 0.1 in stage 0.0
(TID 2)
java.nio.file.NoSuchFileException:
/opt/spark/examples/jars/reno-spark-codebase-0.1.0.jar
at
java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:92)
at
java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111)
at
java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:116)
at java.base/sun.nio.fs.UnixCopyFile.copy(UnixCopyFile.java:548)
at
java.base/sun.nio.fs.UnixFileSystemProvider.copy(UnixFileSystemProvider.java:254)
at java.base/java.nio.file.Files.copy(Files.java:1294)
at
org.apache.spark.util.Utils$.org$apache$spark$util$Utils$$copyRecursive(Utils.scala:664)
at org.apache.spark.util.Utils$.copyFile(Utils.scala:635)
at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:719)
at org.apache.spark.util.Utils$.fetchFile(Utils.scala:496)
at
org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:805)
at
org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:797)
at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at
org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:797)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:369)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)






--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Need help with SparkSQL Query

2018-12-17 Thread Nikhil Goyal
Hi guys,

I have a dataframe of type Record (id: Long, timestamp: Long, isValid:
Boolean,  other metrics)

Schema looks like this:
root
 |-- id: long (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- isValid: boolean (nullable = true)
.

I need to find the earliest valid record per id. In RDD world I can do
groupBy 'id' and find the earliest one but I am not sure how I can do it in
SQL. Since I am doing this in PySpark I cannot really use DataSet API for
this.

One thing I can do is groupBy 'id', find the earliest timestamp available
and then join with the original dataframe to get the right record (all the
metrics).

Or I can create a single column with all the records and then implement a
UDAF in scala and use it in pyspark.

Both solutions don't seem to be straight forward. Is there a simpler
solution to this?

Thanks
Nikhil


Re: Writing to vertica from spark

2018-10-22 Thread Nikhil Goyal
Fixed this by setting fileformat -> "parquet"

On Mon, Oct 22, 2018 at 11:48 AM Nikhil Goyal  wrote:

> Hi guys,
>
> My code is failing with this error
>
> java.lang.Exception: S2V: FATAL ERROR for job S2V_job9197956021769393773.
> Job status information is available in the Vertica table
> test.S2V_JOB_STATUS_USER_NGOYAL.  Unable to save intermediate orc files to
> HDFS path:hdfs://hadoop-dw2-nn.smf1.com/tmp/S2V_job9197956021769393773.
> *Error message:org.apache.spark.sql.AnalysisException: The ORC data source
> must be used with Hive support enabled;*
>
> This is how I am writing it. I followed steps from this link
> <https://www.vertica.com/blog/integrating-apache-spark/>.
>
> dataFrame
>   .write
>   .format("com.vertica.spark.datasource.DefaultSource")
>   .options(connectionProperties)
>   .mode(SaveMode.Append)
>   .save()
>
> Does anybody have any idea how to fix this?
>
>
> Thanks
> Nikhil
>
>


Writing to vertica from spark

2018-10-22 Thread Nikhil Goyal
Hi guys,

My code is failing with this error

java.lang.Exception: S2V: FATAL ERROR for job S2V_job9197956021769393773.
Job status information is available in the Vertica table
test.S2V_JOB_STATUS_USER_NGOYAL.  Unable to save intermediate orc files to
HDFS path:hdfs://hadoop-dw2-nn.smf1.com/tmp/S2V_job9197956021769393773.
*Error message:org.apache.spark.sql.AnalysisException: The ORC data source
must be used with Hive support enabled;*

This is how I am writing it. I followed steps from this link
<https://www.vertica.com/blog/integrating-apache-spark/>.

dataFrame
  .write
  .format("com.vertica.spark.datasource.DefaultSource")
  .options(connectionProperties)
  .mode(SaveMode.Append)
  .save()

Does anybody have any idea how to fix this?


Thanks
Nikhil


Re: [External Sender] Writing dataframe to vertica

2018-10-22 Thread Nikhil Goyal
Got the issue fixed. Had to set 'dbschema' param in connection options. By
default it is set to public

Thanks

On Tue, Oct 16, 2018 at 9:49 PM Femi Anthony 
wrote:

> How are you trying to write to Vertica ? Can you provide some snippets of
> code ?
>
>
> Femi
>
> On Tue, Oct 16, 2018 at 7:24 PM Nikhil Goyal  wrote:
>
>> Hi guys,
>>
>> I am trying to write dataframe to vertica using spark. It seems like
>> spark is creating a temp table under public schema. I don't have access to
>> public schema hence the job is failing. Is there a way to specify another
>> schema?
>>
>> Error
>> ERROR s2v.S2VUtils: createJobStatusTable: FAILED to create job status
>> table public.S2V_JOB_STATUS_USER_NGOYAL
>> java.lang.Exception: S2V: FATAL ERROR for job S2V_job8087339107009511230.
>> Unable to create status table for tracking this
>> job:public.S2V_JOB_STATUS_USER_NGOYAL
>>
>> Thanks
>> Nikhil
>>
>
> --
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>


Writing dataframe to vertica

2018-10-16 Thread Nikhil Goyal
Hi guys,

I am trying to write dataframe to vertica using spark. It seems like spark
is creating a temp table under public schema. I don't have access to public
schema hence the job is failing. Is there a way to specify another schema?

Error
ERROR s2v.S2VUtils: createJobStatusTable: FAILED to create job status table
public.S2V_JOB_STATUS_USER_NGOYAL
java.lang.Exception: S2V: FATAL ERROR for job S2V_job8087339107009511230.
Unable to create status table for tracking this
job:public.S2V_JOB_STATUS_USER_NGOYAL

Thanks
Nikhil


Driver OOM when using writing parquet

2018-08-06 Thread Nikhil Goyal
Hi guys,

I have a simple job which reads LZO thrift files and writes them in Parquet
Format. Driver is going out of memory. Parquet writer does keep some meta
info in memory but that should cause the executor to go out of memory. No
computation is being done on the driver. Any idea what could be the reason?


Thanks
Nikhil


Zstd codec for writing dataframes

2018-06-18 Thread Nikhil Goyal
Hi guys,

I was wondering if there is a way to compress files using zstd. It seems
zstd compression can be used for shuffle data, is there a way to use it for
output data as well?

Thanks
Nikhil


unsubscribe

2018-04-05 Thread Nikhil Kalbande
unsubscribe
DISCLAIMER
==
This e-mail may contain privileged and confidential information which is the 
property of Persistent Systems Ltd. It is intended only for the use of the 
individual or entity to which it is addressed. If you are not the intended 
recipient, you are not authorized to read, retain, copy, print, distribute or 
use this message. If you have received this communication in error, please 
notify the sender and delete all copies of this message. Persistent Systems 
Ltd. does not accept any liability for virus infected mails.


Re: Class cast exception while using Data Frames

2018-03-27 Thread Nikhil Goyal
You can run this on spark shell

*CODE:*

case class InstanceData(service: String, metric: String, zone:
String, source: String, time: Long, value: Double )

val seq = sc.parallelize(Seq(
  InstanceData("serviceA", "metricA", "zoneA", "sourceA", 1000L,
1.0),
  InstanceData("serviceA", "metricA", "zoneA", "hostA", 1000L, 1.0),
  InstanceData("serviceD", "metricA", "zoneB", "hostB", 1000L, 2.0),
  InstanceData("serviceA", "metricF", "zoneA", "hostB", 1000L, 1.0)
))

val instData =  spark.createDataFrame(seq)

def makeMap = udf((service: String, metric: String, value: Double)
=> Map((service, metric) -> value))

val instDF = instData.withColumn("metricMap", makeMap($"service",
$"metric", $"value"))

def avgMapValueUDF = udf((newMap: Map[(String, String), Double],
count: Long) => {
  newMap.keys
.map { keyTuple =>
  val sum = newMap.getOrElse(keyTuple, 0.0)
  (keyTuple, sum / count.toDouble)
    }.toMap
})

instDF.withColumn("customMap", avgMapValueUDF(col("metricMap"),
lit(1))).show



On Mon, Mar 26, 2018 at 11:51 PM, Shmuel Blitz <shmuel.bl...@similarweb.com>
wrote:

> Hi Nikhil,
>
> Can you please put a code snippet that reproduces the issue?
>
> Shmuel
>
> On Tue, Mar 27, 2018 at 12:55 AM, Nikhil Goyal <nownik...@gmail.com>
> wrote:
>
>>  |-- myMap: map (nullable = true)
>>  ||-- key: struct
>>  ||-- value: double (valueContainsNull = true)
>>  |||-- _1: string (nullable = true)
>>  |||-- _2: string (nullable = true)
>>  |-- count: long (nullable = true)
>>
>> On Mon, Mar 26, 2018 at 1:41 PM, Gauthier Feuillen <gauth...@dataroots.io
>> > wrote:
>>
>>> Can you give the output of “printSchema” ?
>>>
>>>
>>> On 26 Mar 2018, at 22:39, Nikhil Goyal <nownik...@gmail.com> wrote:
>>>
>>> Hi guys,
>>>
>>> I have a Map[(String, String), Double] as one of my columns. Using
>>>
>>> input.getAs[Map[(String, String), Double]](0)
>>>
>>>  throws exception: Caused by: java.lang.ClassCastException: 
>>> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be 
>>> cast to scala.Tuple2
>>>
>>> Even the schema says that key is of type struct of (string, string).
>>>
>>> Any idea why this is happening?
>>>
>>>
>>> Thanks
>>>
>>> Nikhil
>>>
>>>
>>>
>>
>
>
> --
> Shmuel Blitz
> Big Data Developer
> Email: shmuel.bl...@similarweb.com
> www.similarweb.com
> <https://www.facebook.com/SimilarWeb/>
> <https://www.linkedin.com/company/429838/>
> <https://twitter.com/similarweb>
>


Re: Class cast exception while using Data Frames

2018-03-26 Thread Nikhil Goyal
 |-- myMap: map (nullable = true)
 ||-- key: struct
 ||-- value: double (valueContainsNull = true)
 |||-- _1: string (nullable = true)
 |||-- _2: string (nullable = true)
 |-- count: long (nullable = true)

On Mon, Mar 26, 2018 at 1:41 PM, Gauthier Feuillen <gauth...@dataroots.io>
wrote:

> Can you give the output of “printSchema” ?
>
>
> On 26 Mar 2018, at 22:39, Nikhil Goyal <nownik...@gmail.com> wrote:
>
> Hi guys,
>
> I have a Map[(String, String), Double] as one of my columns. Using
>
> input.getAs[Map[(String, String), Double]](0)
>
>  throws exception: Caused by: java.lang.ClassCastException: 
> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast 
> to scala.Tuple2
>
> Even the schema says that key is of type struct of (string, string).
>
> Any idea why this is happening?
>
>
> Thanks
>
> Nikhil
>
>
>


Class cast exception while using Data Frames

2018-03-26 Thread Nikhil Goyal
Hi guys,

I have a Map[(String, String), Double] as one of my columns. Using

input.getAs[Map[(String, String), Double]](0)

 throws exception: Caused by: java.lang.ClassCastException:
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot
be cast to scala.Tuple2

Even the schema says that key is of type struct of (string, string).

Any idea why this is happening?


Thanks

Nikhil


Using Thrift with Dataframe

2018-02-28 Thread Nikhil Goyal
Hi guys,

I have a RDD of thrift struct. I want to convert it into a dataframe. Can
someone suggest how I can do this?

Thanks
Nikhil


Re: Job never finishing

2018-02-21 Thread Nikhil Goyal
Thanks for the help :)

On Tue, Feb 20, 2018 at 4:22 PM, Femi Anthony <femib...@gmail.com> wrote:

> You can use spark speculation as a way to get around the problem.
>
> Here is a useful link:
>
> http://asyncified.io/2016/08/13/leveraging-spark-
> speculation-to-identify-and-re-schedule-slow-running-tasks/
>
> Sent from my iPhone
>
> On Feb 20, 2018, at 5:52 PM, Nikhil Goyal <nownik...@gmail.com> wrote:
>
> Hi guys,
>
> I have a job which gets stuck if a couple of tasks get killed due to OOM
> exception. Spark doesn't kill the job and it keeps on running for hours.
> Ideally I would expect Spark to kill the job or restart the killed
> executors but nothing seems to be happening. Anybody got idea about this?
>
> Thanks
> Nikhil
>
>


Job never finishing

2018-02-20 Thread Nikhil Goyal
Hi guys,

I have a job which gets stuck if a couple of tasks get killed due to OOM
exception. Spark doesn't kill the job and it keeps on running for hours.
Ideally I would expect Spark to kill the job or restart the killed
executors but nothing seems to be happening. Anybody got idea about this?

Thanks
Nikhil


GC issues with spark job

2018-02-18 Thread Nikhil Goyal
Hi,

I have a job which is spending approx 30% time in GC. When I looked at the
logs it seems like GC is triggering before the spill happens. I wanted to
know if there is a config setting which I can use to force spark to spill
early, maybe when memory is 60-70% full.

Thanks
Nikhil


Question about DStreamCheckpointData

2017-01-25 Thread Nikhil Goyal
Hi,

I am using DStreamCheckpointData and it seems that spark checkpoints data
even if the rdd processing fails. It seems to checkpoint at the moment it
creates the rdd rather than waiting till its completion. Anybody knows how
to make it wait till completion?

Thanks
Nikhil


ALS.trainImplicit block sizes

2016-10-21 Thread Nikhil Mishra
Hi,

I have a question about the block size to be specified in
ALS.trainImplicit() in pyspark (Spark 1.6.1). There is only one block size
parameter to be specified. I want to know if that would result in
partitioning both the users as well as the items axes.

For example, I am using the following call to ALs.trainImplicit() in my
code.

---

RANK = 50

ITERATIONS = 2

BLOCKS = 1000

ALPHA = 1.0

model = ALS.trainImplicit(ratings, RANK, ITERATIONS, blocks=BLOCKS,
alpha=ALPHA)




Will this partition the users x items matrix into BLOCKS x BLOCKS number of
matrices or will it partition only the users axis thereby resulting in
BLOCKS number of matrices, each with columns = total number of unique items?

Thanks,
Nik


Streaming and Batch code sharing

2016-06-25 Thread Nikhil Goyal
Hi,

Does anyone has a good example where realtime and batch are able to share
same code.
(Other than this one
https://github.com/databricks/reference-apps/blob/master/logs_analyzer/chapter1/reuse.md
)

Thanks
Nikhil


Re: Protobuf class not found exception

2016-05-31 Thread Nikhil Goyal
http://apache-spark-user-list.1001560.n3.nabble.com/Unable-to-find-proto-buffer-class-error-with-RDD-lt-protobuf-gt-td14529.html

But has this been solved?

On Tue, May 31, 2016 at 3:26 PM, Nikhil Goyal <nownik...@gmail.com> wrote:

> I am getting this error when I am trying to create rdd of (protokey,
> value). When I change this to (*protokey.toString*, value) it works fine.
>
>
> *This is the stack trace:*
> java.lang.RuntimeException: Unable to find proto buffer class
> at
> com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:775)
> at sun.reflect.GeneratedMethodAccessor26.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1104)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1807)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> at
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
> at
> org.apache.spark.serializer.DeserializationStream.readKey(Serializer.scala:169)
> at
> org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:201)
> at
> org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:198)
> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
> at
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> at
> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:152)
> at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:58)
> at
> org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:83)
> at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:98)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> *Caused by: java.lang.ClassNotFoundException:
> com.rocketfuel.common.message.protobuf.generated.ApolloProtos$ApolloMvKey*
> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:190)
> at
> com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:768)
> ... 28 more
>
>
>
> The class has been packaged into the jar and also doing *.toString* works
> fine.
> Does anyone has any idea on this?
>
> Thanks
> Nikhil
>


Protobuf class not found exception

2016-05-31 Thread Nikhil Goyal
I am getting this error when I am trying to create rdd of (protokey,
value). When I change this to (*protokey.toString*, value) it works fine.


*This is the stack trace:*
java.lang.RuntimeException: Unable to find proto buffer class
at
com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:775)
at sun.reflect.GeneratedMethodAccessor26.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1104)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1807)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
at
org.apache.spark.serializer.DeserializationStream.readKey(Serializer.scala:169)
at
org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:201)
at
org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:198)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:152)
at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:58)
at
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:83)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:98)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
*Caused by: java.lang.ClassNotFoundException:
com.rocketfuel.common.message.protobuf.generated.ApolloProtos$ApolloMvKey*
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:190)
at
com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:768)
... 28 more



The class has been packaged into the jar and also doing *.toString* works
fine.
Does anyone has any idea on this?

Thanks
Nikhil


Re: Timed aggregation in Spark

2016-05-23 Thread Nikhil Goyal
I don't think this is solving the problem. So here are the issues:
1) How do we push entire data to vertica. Opening a connection per record
will be too costly
2) If a key doesn't come again, how do we push this to vertica
3) How do we schedule the dumping of data to avoid loading too much data in
state.



On Mon, May 23, 2016 at 1:33 PM, Ofir Kerker <ofir.ker...@gmail.com> wrote:

> Yes, check out mapWithState:
>
> https://databricks.com/blog/2016/02/01/faster-stateful-stream-processing-in-apache-spark-streaming.html
>
> _____
> From: Nikhil Goyal <nownik...@gmail.com>
> Sent: Monday, May 23, 2016 23:28
> Subject: Timed aggregation in Spark
> To: <user@spark.apache.org>
>
>
>
> Hi all,
>
> I want to aggregate my data for 5-10 min and then flush the aggregated
> data to some database like vertica. updateStateByKey is not exactly helpful
> in this scenario as I can't flush all the records at once, neither can I
> clear the state. I wanted to know if anyone else has faced a similar issue
> and how did they handle it.
>
> Thanks
> Nikhil
>
>
>


Timed aggregation in Spark

2016-05-23 Thread Nikhil Goyal
Hi all,

I want to aggregate my data for 5-10 min and then flush the aggregated data
to some database like vertica. updateStateByKey is not exactly helpful in
this scenario as I can't flush all the records at once, neither can I clear
the state. I wanted to know if anyone else has faced a similar issue and
how did they handle it.

Thanks
Nikhil


Re: Spark session dies in about 2 days: HDFS_DELEGATION_TOKEN token can'tbe found

2016-03-14 Thread Nikhil Gs
Mine is the same scenario. I get the HDFS_DELEGATION_TOKEN issue exactly
after the 7 days of the spark job started and it then gets killed.

Even  I'm also looking for the solution.

Regards,
Nik.

On Fri, Mar 11, 2016 at 8:10 PM, Ruslan Dautkhanov 
wrote:

> [image: Boxbe]  This message is eligible
> for Automatic Cleanup! (dautkha...@gmail.com) Add cleanup rule
> 
> | More info
> 
>
> Spark session dies out after ~40 hours when running against Hadoop Secure
> cluster.
>
> spark-submit has --principal and --keytab so kerberos ticket renewal works
> fine according to logs.
>
> Some happens with HDFS dfs connection?
>
> These messages come up every 1 second:
>   See complete stack: http://pastebin.com/QxcQvpqm
>
> 16/03/11 16:04:59 WARN hdfs.LeaseRenewer: Failed to renew lease for
>> [DFSClient_NONMAPREDUCE_1534318438_13] for 2802 seconds.  Will retry
>> shortly ...
>> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
>> token (HDFS_DELEGATION_TOKEN token 1349 for rdautkha) can't be found in
>> cache
>
>
> Then in 1 hour it stops trying:
>
> 16/03/11 16:18:17 WARN hdfs.DFSClient: Failed to renew lease for
>> DFSClient_NONMAPREDUCE_1534318438_13 for 3600 seconds (>= hard-limit =3600
>> seconds.) Closing all files being written ...
>> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
>> token (HDFS_DELEGATION_TOKEN token 1349 for rdautkha) can't be found in
>> cache
>
>
> It doesn't look it is Kerberos principal ticket renewal problem, because
> that would expire much sooner (by default we have 12 hours), and from the
> logs Spark kerberos ticket renewer works fine.
>
> It's some sort of other hdfs delegation token renewal process that breaks?
>
> RHEL 6.7
>> Spark 1.5
>> Hadoop 2.6
>
>
> Found HDFS-5322, YARN-2648 that seem relevant, but I am not sure if it's
> the same problem.
> It seems Spark problem as I only seen this problem in Spark.
> This is reproducible problem, just wait for ~40 hours and a Spark session
> is no good.
>
>
> Thanks,
> Ruslan
>
>
>


Re: Spark Token Expired Exception

2016-01-06 Thread Nikhil Gs
These are my versions

cdh version = 5.4.1
spark version, 1.3.0
kafka = KAFKA-0.8.2.0-1.kafka1.3.1.p0.9
hbase versions = 1.0.0

Regards,
Nik.

On Wed, Jan 6, 2016 at 3:50 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> Which Spark / hadoop release are you using ?
>
> Thanks
>
> On Wed, Jan 6, 2016 at 12:16 PM, Nikhil Gs <gsnikhil1432...@gmail.com>
> wrote:
>
>> Hello Team,
>>
>>
>> Thank you for your time in advance.
>>
>>
>> Below are the log lines of my spark job which is used for consuming the
>> messages from Kafka Instance and its loading to Hbase. I have noticed the
>> below Warn lines and later it resulted to errors. But I noticed that,
>> exactly after 7 days the token is getting expired and its trying to renew
>> the token but its not able to even after retrying it. Mine is a Kerberos
>> cluster. Can you please look into it and guide me whats the issue.
>>
>>
>> Your time and suggestions are very valuable.
>>
>>
>>
>> 15/12/29 11:33:50 INFO scheduler.JobScheduler: Finished job streaming job
>> 145141043 ms.0 from job set of time 145141043 ms
>>
>> 15/12/29 11:33:50 INFO scheduler.JobScheduler: Starting job streaming job
>> 145141043 ms.1 from job set of time 145141043 ms
>>
>> 15/12/29 11:33:50 INFO scheduler.JobScheduler: Finished job streaming job
>> 145141043 ms.1 from job set of time 145141043 ms
>>
>> 15/12/29 11:33:50 INFO rdd.BlockRDD: Removing RDD 120956 from persistence
>> list
>>
>> 15/12/29 11:33:50 INFO scheduler.JobScheduler: Total delay: 0.003 s for
>> time 145141043 ms (execution: 0.000 s)
>>
>> 15/12/29 11:33:50 INFO storage.BlockManager: Removing RDD 120956
>>
>> 15/12/29 11:33:50 INFO kafka.KafkaInputDStream: Removing blocks of RDD
>> BlockRDD[120956] at createStream at SparkStreamingEngine.java:40 of time
>> 145141043 ms
>>
>> 15/12/29 11:33:50 INFO rdd.MapPartitionsRDD: Removing RDD 120957 from
>> persistence list
>>
>> 15/12/29 11:33:50 INFO storage.BlockManager: Removing RDD 120957
>>
>> 15/12/29 11:33:50 INFO scheduler.ReceivedBlockTracker: Deleting batches
>> ArrayBuffer(145141041 ms)
>>
>> 15/12/29 11:34:00 INFO scheduler.JobScheduler: Added jobs for time
>> 145141044 ms
>>
>> 15/12/29 11:34:00 INFO scheduler.JobScheduler: Starting job streaming job
>> 145141044 ms.0 from job set of time 145141044 ms
>>
>> 15/12/29 11:34:00 INFO scheduler.JobScheduler: Finished job streaming job
>> 145141044 ms.0 from job set of time 145141044 ms
>>
>> 15/12/29 11:34:00 INFO scheduler.JobScheduler: Starting job streaming job
>> 145141044 ms.1 from job set of time 145141044 ms
>>
>> 15/12/29 11:34:00 INFO scheduler.JobScheduler: Finished job streaming job
>> 145141044 ms.1 from job set of time 145141044 ms
>>
>> 15/12/29 11:34:00 INFO rdd.BlockRDD: Removing RDD 120958 from persistence
>> list
>>
>> 15/12/29 11:34:00 INFO scheduler.JobScheduler: Total delay: 0.003 s for
>> time 145141044 ms (execution: 0.001 s)
>>
>> 15/12/29 11:34:00 INFO storage.BlockManager: Removing RDD 120958
>>
>> 15/12/29 11:34:00 INFO kafka.KafkaInputDStream: Removing blocks of RDD
>> BlockRDD[120958] at createStream at SparkStreamingEngine.java:40 of time
>> 145141044 ms
>>
>> 15/12/29 11:34:00 INFO rdd.MapPartitionsRDD: Removing RDD 120959 from
>> persistence list
>>
>> 15/12/29 11:34:00 INFO storage.BlockManager: Removing RDD 120959
>>
>> 15/12/29 11:34:00 INFO scheduler.ReceivedBlockTracker: Deleting batches
>> ArrayBuffer(145141042 ms)
>>
>> 15/12/29 11:34:10 INFO scheduler.JobScheduler: Added jobs for time
>> 145141045 ms
>>
>> 15/12/29 11:34:10 INFO scheduler.JobScheduler: Starting job streaming job
>> 145141045 ms.0 from job set of time 145141045 ms
>>
>> 15/12/29 11:34:10 INFO scheduler.JobScheduler: Finished job streaming job
>> 145141045 ms.0 from job set of time 145141045 ms
>>
>> 15/12/29 11:34:10 INFO scheduler.JobScheduler: Starting job streaming job
>> 145141045 ms.1 from job set of time 145141045 ms
>>
>> 15/12/29 11:34:10 INFO scheduler.JobScheduler: Finished job streaming job
>> 145141045 ms.1 from job set of time 145141045 ms
>>
>> 15/12/29 11:34:10 INFO rdd.BlockRDD: Removing RDD 120960 from persistence
>> list
>>
>> 15/12/29 11:34:10 INFO scheduler.JobScheduler: Total delay: 0.004 s for
>> time 145141045 ms (execution: 0.001

Spark Token Expired Exception

2016-01-06 Thread Nikhil Gs
Hello Team,


Thank you for your time in advance.


Below are the log lines of my spark job which is used for consuming the
messages from Kafka Instance and its loading to Hbase. I have noticed the
below Warn lines and later it resulted to errors. But I noticed that,
exactly after 7 days the token is getting expired and its trying to renew
the token but its not able to even after retrying it. Mine is a Kerberos
cluster. Can you please look into it and guide me whats the issue.


Your time and suggestions are very valuable.



15/12/29 11:33:50 INFO scheduler.JobScheduler: Finished job streaming job
145141043 ms.0 from job set of time 145141043 ms

15/12/29 11:33:50 INFO scheduler.JobScheduler: Starting job streaming job
145141043 ms.1 from job set of time 145141043 ms

15/12/29 11:33:50 INFO scheduler.JobScheduler: Finished job streaming job
145141043 ms.1 from job set of time 145141043 ms

15/12/29 11:33:50 INFO rdd.BlockRDD: Removing RDD 120956 from persistence
list

15/12/29 11:33:50 INFO scheduler.JobScheduler: Total delay: 0.003 s for
time 145141043 ms (execution: 0.000 s)

15/12/29 11:33:50 INFO storage.BlockManager: Removing RDD 120956

15/12/29 11:33:50 INFO kafka.KafkaInputDStream: Removing blocks of RDD
BlockRDD[120956] at createStream at SparkStreamingEngine.java:40 of time
145141043 ms

15/12/29 11:33:50 INFO rdd.MapPartitionsRDD: Removing RDD 120957 from
persistence list

15/12/29 11:33:50 INFO storage.BlockManager: Removing RDD 120957

15/12/29 11:33:50 INFO scheduler.ReceivedBlockTracker: Deleting batches
ArrayBuffer(145141041 ms)

15/12/29 11:34:00 INFO scheduler.JobScheduler: Added jobs for time
145141044 ms

15/12/29 11:34:00 INFO scheduler.JobScheduler: Starting job streaming job
145141044 ms.0 from job set of time 145141044 ms

15/12/29 11:34:00 INFO scheduler.JobScheduler: Finished job streaming job
145141044 ms.0 from job set of time 145141044 ms

15/12/29 11:34:00 INFO scheduler.JobScheduler: Starting job streaming job
145141044 ms.1 from job set of time 145141044 ms

15/12/29 11:34:00 INFO scheduler.JobScheduler: Finished job streaming job
145141044 ms.1 from job set of time 145141044 ms

15/12/29 11:34:00 INFO rdd.BlockRDD: Removing RDD 120958 from persistence
list

15/12/29 11:34:00 INFO scheduler.JobScheduler: Total delay: 0.003 s for
time 145141044 ms (execution: 0.001 s)

15/12/29 11:34:00 INFO storage.BlockManager: Removing RDD 120958

15/12/29 11:34:00 INFO kafka.KafkaInputDStream: Removing blocks of RDD
BlockRDD[120958] at createStream at SparkStreamingEngine.java:40 of time
145141044 ms

15/12/29 11:34:00 INFO rdd.MapPartitionsRDD: Removing RDD 120959 from
persistence list

15/12/29 11:34:00 INFO storage.BlockManager: Removing RDD 120959

15/12/29 11:34:00 INFO scheduler.ReceivedBlockTracker: Deleting batches
ArrayBuffer(145141042 ms)

15/12/29 11:34:10 INFO scheduler.JobScheduler: Added jobs for time
145141045 ms

15/12/29 11:34:10 INFO scheduler.JobScheduler: Starting job streaming job
145141045 ms.0 from job set of time 145141045 ms

15/12/29 11:34:10 INFO scheduler.JobScheduler: Finished job streaming job
145141045 ms.0 from job set of time 145141045 ms

15/12/29 11:34:10 INFO scheduler.JobScheduler: Starting job streaming job
145141045 ms.1 from job set of time 145141045 ms

15/12/29 11:34:10 INFO scheduler.JobScheduler: Finished job streaming job
145141045 ms.1 from job set of time 145141045 ms

15/12/29 11:34:10 INFO rdd.BlockRDD: Removing RDD 120960 from persistence
list

15/12/29 11:34:10 INFO scheduler.JobScheduler: Total delay: 0.004 s for
time 145141045 ms (execution: 0.001 s)

15/12/29 11:34:10 INFO storage.BlockManager: Removing RDD 120960

15/12/29 11:34:10 INFO kafka.KafkaInputDStream: Removing blocks of RDD
BlockRDD[120960] at createStream at SparkStreamingEngine.java:40 of time
145141045 ms

15/12/29 11:34:10 INFO rdd.MapPartitionsRDD: Removing RDD 120961 from
persistence list

15/12/29 11:34:10 INFO storage.BlockManager: Removing RDD 120961

15/12/29 11:34:10 INFO scheduler.ReceivedBlockTracker: Deleting batches
ArrayBuffer(145141043 ms)

15/12/29 11:34:13 WARN security.UserGroupInformation:
PriviledgedActionException as:s (auth:SIMPLE)
cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
token (HDFS_DELEGATION_TOKEN token 3104414 for s) is expired

15/12/29 11:34:13 *WARN ipc.Client: Exception encountered while connecting
to the server* :
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
token (HDFS_DELEGATION_TOKEN token 3104414 for s) is expired

15/12/29 11:34:13 *WARN security.UserGroupInformation:
PriviledgedActionException as:s (auth:SIMPLE) *
cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):*
token 

Re: Classification model method not found

2015-12-22 Thread Nikhil Joshi
Hi Ted,

Thanks. That fixed the issue :).

Nikhil

On Tue, Dec 22, 2015 at 1:14 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> Looks like you should define ctor for ExtendedLR which accepts String
> (the uid).
>
> Cheers
>
> On Tue, Dec 22, 2015 at 1:04 PM, njoshi <nikhil.jo...@teamaol.com> wrote:
>
>> Hi,
>>
>> I have a custom extended LogisticRegression model which I want to test
>> against a parameter grid search. I am running as follows:
>>
>> /
>>val exLR = new ExtendedLR()
>>   .setMaxIter(100)
>>   .setFitIntercept(true)
>>
>> /*
>>  * Cross Validator parameter grid
>>  */
>> val paramGrid = new ParamGridBuilder()
>>   .addGrid(exLR.regParam, Array(1e-7, 1e-6, 1e-5, 1e-4, 1e-3, 2e-3,
>> 1e-2, 1e-1, 0.001341682))
>>   .addGrid(exLR.elasticNetParam, Array(0.95))
>>   .build()
>>
>>
>> /*
>>  * Perform cross validation over the parameters
>>  */
>> val cv = new CrossValidator()
>>   .setEstimator(exLR)
>>   .setEvaluator(new BinaryClassificationEvaluator)
>>   .setEstimatorParamMaps(paramGrid)
>>   .setNumFolds(10)
>>
>> /*
>>  * Run the grid search and pick up the best model
>>  */
>> val bestModel = cv.fit(trainingData)
>>  .bestModel.asInstanceOf[ExtendedLRModel]
>> /
>>
>> While run individually  (exLR.fit(trainingData) way)  it works fine, the
>> crossValidation code produces the following error.
>>
>> /
>> java.lang.NoSuchMethodException:
>> org.apache.spark.ml.classification.ExtendedLR.(java.lang.String)
>> at java.lang.Class.getConstructor0(Class.java:3082)
>> at java.lang.Class.getConstructor(Class.java:1825)
>> at
>> org.apache.spark.ml.param.Params$class.defaultCopy(params.scala:529)
>> at
>> org.apache.spark.ml.PipelineStage.defaultCopy(Pipeline.scala:37)
>> at
>>
>> org.apache.spark.ml.classification.ExtendedLR.copy(FactorizationMachine.scala:434)
>> at
>>
>> org.apache.spark.ml.classification.ExtendedLR.copy(FactorizationMachine.scala:156)
>> at org.apache.spark.ml.Estimator.fit(Estimator.scala:59)
>> at
>> org.apache.spark.ml.Estimator$$anonfun$fit$1.apply(Estimator.scala:78)
>> at
>> org.apache.spark.ml.Estimator$$anonfun$fit$1.apply(Estimator.scala:78)
>> at
>>
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
>> at
>>
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
>> at
>>
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>> at
>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>> at
>> scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
>> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
>> at org.apache.spark.ml.Estimator.fit(Estimator.scala:78)
>> at
>>
>> org.apache.spark.ml.tuning.CrossValidator$$anonfun$fit$1.apply(CrossValidator.scala:89)
>> at
>>
>> org.apache.spark.ml.tuning.CrossValidator$$anonfun$fit$1.apply(CrossValidator.scala:84)
>> at
>>
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>> at
>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>> at
>> org.apache.spark.ml.tuning.CrossValidator.fit(CrossValidator.scala:84)
>> at com.aol.advertising.ml.Driver$.main(Driver.scala:244)
>> at com.aol.advertising.ml.Driver.main(Driver.scala)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>>
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>>
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:497)
>> /
>>
>> Is there anything, such as implicits, I need to add someplace?
>> Note, *ExtendedLR* has exact same inheritance tree.
>>
>> Thanks in advance,
>> Nikhil
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Classification-model-init-method-not-found-tp25770.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


-- 

*Nikhil Joshi*Princ Data Scientist
*Aol*PLATFORMS.
*395 Page Mill Rd, *Palo Alto
<http://www.mapquest.com/maps?city=Palo+Alto=CA>, CA
<http://www.mapquest.com/maps?state=CA> 94306-2024
<http://www.mapquest.com/maps?zipcode=94306-2024>vvmr: 8894737


Re: Unable to import SharedSparkContext

2015-11-18 Thread Nikhil Joshi
Thanks Marcelo and Sourigna. I see the spark-testing-base being part of
Spark, but has been included under test package of Spark-core. That caused
the trouble :(.

On Wed, Nov 18, 2015 at 11:26 AM, Sourigna Phetsarath <
gna.phetsar...@teamaol.com> wrote:

> Plus this article:
> http://blog.cloudera.com/blog/2015/09/making-apache-spark-testing-easy-with-spark-testing-base/
>
> On Wed, Nov 18, 2015 at 2:25 PM, Sourigna Phetsarath <
> gna.phetsar...@teamaol.com> wrote:
>
>> Nikhil,
>>
>> Please take a look at: https://github.com/holdenk/spark-testing-base
>>
>> On Wed, Nov 18, 2015 at 2:12 PM, Marcelo Vanzin <van...@cloudera.com>
>> wrote:
>>
>>> On Wed, Nov 18, 2015 at 11:08 AM, njoshi <nikhil.jo...@teamaol.com>
>>> wrote:
>>> > Doesn't *SharedSparkContext* come with spark-core? Do I need to
>>> include any
>>> > special package in the library dependancies for using
>>> SharedSparkContext?
>>>
>>> That's a test class. It's not part of the Spark API.
>>>
>>> --
>>> Marcelo
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>>
>> --
>>
>>
>> *Gna Phetsarath*System Architect // AOL Platforms // Data Services //
>> Applied Research Chapter
>> 770 Broadway, 5th Floor, New York, NY 10003
>> o: 212.402.4871 // m: 917.373.7363
>> vvmr: 8890237 aim: sphetsarath20 t: @sourigna
>>
>> * <http://www.aolplatforms.com>*
>>
>
>
>
> --
>
>
> *Gna Phetsarath*System Architect // AOL Platforms // Data Services //
> Applied Research Chapter
> 770 Broadway, 5th Floor, New York, NY 10003
> o: 212.402.4871 // m: 917.373.7363
> vvmr: 8890237 aim: sphetsarath20 t: @sourigna
>
> * <http://www.aolplatforms.com>*
>



-- 

*Nikhil Joshi*Princ Data Scientist
*Aol*PLATFORMS.
*395 Page Mill Rd, *Palo Alto
<http://www.mapquest.com/maps?city=Palo+Alto=CA>, CA
<http://www.mapquest.com/maps?state=CA> 94306-2024
<http://www.mapquest.com/maps?zipcode=94306-2024>vvmr: 8894737


Re: Spark Job is getting killed after certain hours

2015-11-17 Thread Nikhil Gs
Hello Everyone,

Firstly, thank you so much for the response. In our cluster, we are using
Spark 1.3.0 and our cluster version is CDH 5.4.1. Yes, we are also using
Kerbros in our cluster and the kerberos version is 1.10.3.

The error "*GSS initiate failed [Caused by GSSException: No valid
credentials provided" *was occurring when we are trying to load data from
kafka  topic to hbase by using Spark classes and spark submit job.

My question is, we also have an other project named as XXX in our cluster
which is successfully deployed and its running and the scenario for that
project is, flume + Spark submit + Hbase table. For this scenario, it works
fine in our Kerberos cluster and why not for kafkatopic + Spark Submit +
Hbase table.

Are we doing anything wrong? Not able to figure out? Please suggest us.

Thanks in advance!

Regards,
Nik.

On Tue, Nov 17, 2015 at 4:03 AM, Steve Loughran <ste...@hortonworks.com>
wrote:

>
> On 17 Nov 2015, at 02:00, Nikhil Gs <gsnikhil1432...@gmail.com> wrote:
>
> Hello Team,
>
> Below is the error which we are facing in our cluster after 14 hours of
> starting the spark submit job. Not able to understand the issue and why its
> facing the below error after certain time.
>
> If any of you have faced the same scenario or if you have any idea then
> please guide us. To identify the issue, if you need any other info then
> please revert me back with the requirement.Thanks a lot in advance.
>
> *Log Error:  *
>
> 15/11/16 04:54:48 ERROR ipc.AbstractRpcClient: SASL authentication failed.
> The most likely cause is missing or invalid credentials. Consider 'kinit'.
>
> javax.security.sasl.SaslException: *GSS initiate failed [Caused by
> GSSException: No valid credentials provided (Mechanism level: Failed to
> find any Kerberos tgt)]*
>
>
> I keep my list of causes of error messages online:
> https://steveloughran.gitbooks.io/kerberos_and_hadoop/content/sections/errors.html
>
> Spark only support long-lived work on a kerberos cluster from 1.5+, with a
> keytab being supplied to the job. Without this, the yarn client grabs some
> tickets at launch time and hangs on until they expire, which for you is 14
> hours
>
> (For anyone using ticket-at-launch auth, know that Spark 1.5.0-1.5.2
> doesnt talk to Hive on a kerberized cluster; some reflection-related issues
> which weren't picked up during testing. 1.5.3 will fix this
>


Re: Spark LogisticRegression returns scaled coefficients

2015-11-17 Thread Nikhil Joshi
Hi,

Wonderful. I was sampling the output, but with a bug. Your comment brought
the realization :). I was indeed victimized by the complete separability
issue :).

Thanks a lot.
with regards,
Nikhil

On Tue, Nov 17, 2015 at 5:26 PM, DB Tsai <dbt...@dbtsai.com> wrote:

> How do you compute the probability given the weights? Also, given a
> probability, you need to sample positive and negative based on the
> probability, and how do you do this? I'm pretty sure that the LoR will
> give you correct weights, and please see the
> generateMultinomialLogisticInput  in
>
> https://github.com/apache/spark/blob/master/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala
>
> Sincerely,
>
> DB Tsai
> --
> Web: https://www.dbtsai.com
> PGP Key ID: 0xAF08DF8D
>
>
> On Tue, Nov 17, 2015 at 4:11 PM, njoshi <nikhil.jo...@teamaol.com> wrote:
> > I am testing the LogisticRegression performance on a synthetically
> generated
> > data. The weights I have as input are
> >
> >w = [2, 3, 4]
> >
> > with no intercept and three features. After training on 1000
> synthetically
> > generated datapoint assuming random normal distribution for each, the
> Spark
> > LogisticRegression model I obtain has weights as
> >
> >  [6.005520656096823,9.35980263762698,12.203400879214152]
> >
> > I can see that each weight is scaled by a factor close to '3' w.r.t. the
> > original values. I am unable to guess the reason behind this. The code is
> > simple enough as
> >
> >
> > /*
> >  * Logistic Regression model
> >  */
> > val lr = new LogisticRegression()
> >   .setMaxIter(50)
> >   .setRegParam(0.001)
> >   .setElasticNetParam(0.95)
> >   .setFitIntercept(false)
> >
> > val lrModel = lr.fit(trainingData)
> >
> >
> > println(s"${lrModel.weights}")
> >
> >
> >
> > I would greatly appreciate if someone could shed some light on what's
> fishy
> > here.
> >
> > with kind regards, Nikhil
> >
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-LogisticRegression-returns-scaled-coefficients-tp25405.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>



-- 

*Nikhil Joshi*Princ Data Scientist
*Aol*PLATFORMS.
*395 Page Mill Rd, *Palo Alto
<http://www.mapquest.com/maps?city=Palo+Alto=CA>, CA
<http://www.mapquest.com/maps?state=CA> 94306-2024
<http://www.mapquest.com/maps?zipcode=94306-2024>vvmr: 8894737


Spark Job is getting killed after certain hours

2015-11-16 Thread Nikhil Gs
Hello Team,

Below is the error which we are facing in our cluster after 14 hours of
starting the spark submit job. Not able to understand the issue and why its
facing the below error after certain time.

If any of you have faced the same scenario or if you have any idea then
please guide us. To identify the issue, if you need any other info then
please revert me back with the requirement.Thanks a lot in advance.

*Log Error:  *

15/11/16 04:54:48 ERROR ipc.AbstractRpcClient: SASL authentication failed.
The most likely cause is missing or invalid credentials. Consider 'kinit'.

javax.security.sasl.SaslException: *GSS initiate failed [Caused by
GSSException: No valid credentials provided (Mechanism level: Failed to
find any Kerberos tgt)]*

at
com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:212)

at
org.apache.hadoop.hbase.security.HBaseSaslRpcClient.saslConnect(HBaseSaslRpcClient.java:179)

at
org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupSaslConnection(RpcClientImpl.java:605)

at
org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.access$600(RpcClientImpl.java:154)

at
org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$2.run(RpcClientImpl.java:731)

at
org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$2.run(RpcClientImpl.java:728)

at java.security.AccessController.doPrivileged(Native
Method)

at javax.security.auth.Subject.doAs(Subject.java:415)

at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)

at
org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupIOstreams(RpcClientImpl.java:728)

at
org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.writeRequest(RpcClientImpl.java:881)

at
org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.tracedWriteRequest(RpcClientImpl.java:850)

at
org.apache.hadoop.hbase.ipc.RpcClientImpl.call(RpcClientImpl.java:1174)

at
org.apache.hadoop.hbase.ipc.AbstractRpcClient.callBlockingMethod(AbstractRpcClient.java:216)

at
org.apache.hadoop.hbase.ipc.AbstractRpcClient$BlockingRpcChannelImplementation.callBlockingMethod(AbstractRpcClient.java:300)

at
org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$BlockingStub.get(ClientProtos.java:31865)

at
org.apache.hadoop.hbase.protobuf.ProtobufUtil.getRowOrBefore(ProtobufUtil.java:1580)

at
org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegionInMeta(ConnectionManager.java:1294)

at
org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1126)

at
org.apache.hadoop.hbase.client.AsyncProcess.submit(AsyncProcess.java:369)

at
org.apache.hadoop.hbase.client.AsyncProcess.submit(AsyncProcess.java:320)

at
org.apache.hadoop.hbase.client.BufferedMutatorImpl.backgroundFlushCommits(BufferedMutatorImpl.java:206)

at
org.apache.hadoop.hbase.client.BufferedMutatorImpl.flush(BufferedMutatorImpl.java:183)

at
org.apache.hadoop.hbase.client.HTable.flushCommits(HTable.java:1482)

at
org.apache.hadoop.hbase.client.HTable.put(HTable.java:1095)

at
com.suxk.bigdata.pulse.consumer.ModempollHbaseLoadHelper$1.run(ModempollHbaseLoadHelper.java:89)

at java.security.AccessController.doPrivileged(Native
Method)

at javax.security.auth.Subject.doAs(Subject.java:356)

at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1651)

at
com.suxk.bigdata.pulse.consumer.ModempollHbaseLoadHelper.loadToHbase(ModempollHbaseLoadHelper.java:48)

at
com.suxk.bigdata.pulse.consumer.ModempollSparkStreamingEngine$1.call(ModempollSparkStreamingEngine.java:52)

at
com.suxk.bigdata.pulse.consumer.ModempollSparkStreamingEngine$1.call(ModempollSparkStreamingEngine.java:48)

at
org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:999)

at
scala.collection.Iterator$$anon$11.next(Iterator.scala:328)

at
scala.collection.Iterator$$anon$10.next(Iterator.scala:312)

at
scala.collection.Iterator$class.foreach(Iterator.scala:727)

at
scala.collection.AbstractIterator.foreach(Iterator.scala:1157)

at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)

at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)

at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)

at scala.collection.TraversableOnce$class.to

Kafka and Spark combination

2015-10-09 Thread Nikhil Gs
Has anyone worked with Kafka in a scenario where the Streaming data from
the Kafka consumer is picked by Spark (Java) functionality and directly
placed in Hbase.

Regards,
Gs.


Re: Kafka streaming "at least once" semantics

2015-10-09 Thread Nikhil Gs
Hello Everyone,

Has anyone worked with Kafka in a scenario where the Streaming data from
the Kafka consumer is picked by Spark (Java) functionality and directly
placed in Hbase.

Please let me know, we are completely new to this scenario. That will be
very helpful.

Regards,
GS.

Regards,
Nik.

On Fri, Oct 9, 2015 at 7:30 AM, pushkar priyadarshi <
priyadarshi.push...@gmail.com> wrote:

> [image: Boxbe]  This message is eligible
> for Automatic Cleanup! (priyadarshi.push...@gmail.com) Add cleanup rule
> 
> | More info
> 
>
> Spark 1.5 kafka direct i think does not store messages rather than it
> fetches messages as in when consumed in the pipeline.That would prevent you
> from having data loss.
>
>
>
> On Fri, Oct 9, 2015 at 7:34 AM, bitborn  wrote:
>
>> Hi all,
>>
>> My company is using Spark streaming and the Kafka API's to process an
>> event
>> stream. We've got most of our application written, but are stuck on "at
>> least once" processing.
>>
>> I created a demo to show roughly what we're doing here:
>> https://github.com/bitborn/resilient-kafka-streaming-in-spark
>> 
>>
>> The problem we're having is when the application experiences an exception
>> (network issue, out of memory, etc) it will drop the batch it's
>> processing.
>> The ideal behavior is it will process each event "at least once" even if
>> that means processing it more than once. Whether this happens via
>> checkpointing, WAL, or kafka offsets is irrelevant, as long as we don't
>> drop
>> data. :)
>>
>> A couple of things we've tried:
>> - Using the kafka direct stream API (via  Cody Koeninger
>> <
>> https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/IdempotentExample.scala
>> >
>> )
>> - Using checkpointing with both the low-level and high-level API's
>> - Enabling the write ahead log
>>
>> I've included a log here  spark.log
>> <
>> https://github.com/bitborn/resilient-kafka-streaming-in-spark/blob/master/spark.log
>> >
>> , but I'm afraid it doesn't reveal much.
>>
>> The fact that others seem to be able to get this working properly suggests
>> we're missing some magic configuration or are possibly executing it in a
>> way
>> that won't support the desired behavior.
>>
>> I'd really appreciate some pointers!
>>
>> Thanks much,
>> Andrew Clarkson
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-streaming-at-least-once-semantics-tp24995.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>


PySpark Unknown Opcode Error

2015-05-26 Thread Nikhil Muralidhar
Hello,
  I am trying to run a spark job (which runs fine on the master node of the
cluster), on a HDFS hadoop cluster using YARN. When I run the job which has
a rdd.saveAsTextFile() line in it, I get the following error:

*SystemError: unknown opcode*

The entire stacktrace has been appended to this message.

 All the nodes on the cluster have Python 2.7.9 running on them including
the master and all of them have the variable SPARK_PYTHON set to the
anaconda python path. When I try pyspark-shell on these instances they use
anaconda python to open up the spark shell.

I installed anaconda on all slaves after looking at the python version
incompatibility issues mentioned in the following post:


http://glennklockwood.blogspot.com/2014/06/spark-on-supercomputers-few-notes.html

Please let me know what the issue might be.

The spark version we are using is Spark 1.3
15/05/26 18:03:55 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in 
memory on ip-10-64-10-221.ec2.internal:36266 (size: 5.1 KB, free: 445.4 MB)
15/05/26 18:03:55 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in 
memory on ip-10-64-10-222.ec2.internal:33470 (size: 5.1 KB, free: 445.4 MB)
15/05/26 18:03:55 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in 
memory on ip-10-64-10-221.ec2.internal:36266 (size: 18.8 KB, free: 445.4 MB)
15/05/26 18:03:55 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in 
memory on ip-10-64-10-222.ec2.internal:33470 (size: 18.8 KB, free: 445.4 MB)
15/05/26 18:03:56 WARN scheduler.TaskSetManager: Lost task 20.0 in stage 0.0 
(TID 7, ip-10-64-10-221.ec2.internal): 
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File /home/hadoop/spark/python/pyspark/worker.py, line 101, in main
process()
  File /home/hadoop/spark/python/pyspark/worker.py, line 96, in process
serializer.dump_stream(func(split_index, iterator), outfile)
  File /home/hadoop/spark/python/pyspark/rdd.py, line 2252, in pipeline_func
return func(split, prev_func(split, iterator))
  File /home/hadoop/spark/python/pyspark/rdd.py, line 2252, in pipeline_func
return func(split, prev_func(split, iterator))
  File /home/hadoop/spark/python/pyspark/rdd.py, line 282, in func
return f(iterator)
  File /home/hadoop/spark/python/pyspark/rdd.py, line 1704, in combineLocally
if spill else InMemoryMerger(agg)
SystemError: unknown opcode

at 
org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:135)
at 
org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:176)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:311)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

15/05/26 18:03:56 INFO scheduler.TaskSetManager: Lost task 2.0 in stage 0.0 
(TID 0) on executor ip-10-64-10-221.ec2.internal: 
org.apache.spark.api.python.PythonException (Traceback (most recent call last):
  File /home/hadoop/spark/python/pyspark/worker.py, line 101, in main
process()
  File /home/hadoop/spark/python/pyspark/worker.py, line 96, in process
serializer.dump_stream(func(split_index, iterator), outfile)
  File /home/hadoop/spark/python/pyspark/rdd.py, line 2252, in pipeline_func
return func(split, prev_func(split, iterator))
  File /home/hadoop/spark/python/pyspark/rdd.py, line 2252, in pipeline_func
return func(split, prev_func(split, iterator))
  File /home/hadoop/spark/python/pyspark/rdd.py, line 282, in func
return f(iterator)
  File /home/hadoop/spark/python/pyspark/rdd.py, line 1704, in combineLocally
if spill else InMemoryMerger(agg)
SystemError: unknown opcode
) [duplicate 1]
15/05/26 18:03:56 INFO scheduler.TaskSetManager: Lost task 21.0 in stage 0.0 
(TID 8) on executor ip-10-64-10-221.ec2.internal: 
org.apache.spark.api.python.PythonException (Traceback (most recent call last):
  File /home/hadoop/spark/python/pyspark/worker.py, line 101, in main
process()
  File /home/hadoop/spark/python/pyspark/worker.py, line 96, in process
serializer.dump_stream(func(split_index, iterator), outfile)
  File /home/hadoop/spark/python/pyspark/rdd.py, line 2252, in pipeline_func

Re: PySpark Job throwing IOError

2015-05-19 Thread Muralidhar, Nikhil
Hello all,
  I have an error in pyspark for which I have not the faintest idea of the 
cause. All I can tell from the stack trace is that it can't find a pyspark file 
on the path /mnt/spark-*/pyspark-*. Apart from that I need someone more  
experienced than me with Spark to look into it and help diagnose the problem 
and suggest potential solutions, hence I am looking to this group for help.


If anyone wants to read the same question on Stack Overflow here is the link:  
http://stackoverflow.com/questions/30328104/pyspark-job-throwing-ioerror

Here's the same thing pasted as raw text:


I am trying to write a simple KNN job using pyspark on a hdfs cluster. I am 
using very few input files to perform the job so I don't think it's a memory 
(space). I do not do a broadcast in any part of my code. So it is surprising to 
me when the broadcast.py fails? I however do have python dictionaries that I 
have in shared memory without explicitly doing a broadcast.

Can anyone help me understand what is going on?

I have appended my python file and the stack trace to this email.


Thanks,

Nikhil

from pyspark.mllib.linalg import SparseVector
from pyspark import SparkContext
import glob
import sys
import time
import subprocess
from itertools import combinations
We create user and item indices starting from 0 to #users and 0 to #items respectively. This is done to store them in sparseVectors as dicts.
def create_indices(inputdir):
items=dict()
user_id_to_idx=dict()
user_idx_to_id=dict()
item_idx_to_id=dict()
item_id_to_idx=dict()
item_idx=0
user_idx=0

cat=subprocess.Popen([hadoop,fs,-cat,/user/hadoop/+inputdir+/*.txt],stdout=subprocess.PIPE)
for line in cat.stdout:
toks=map(str,line.strip().split(\t))
try:
user_id_to_idx[toks[1].strip()]
except KeyError:
if toks[1].strip()!=None:
user_id_to_idx[toks[1].strip()]=user_idx
user_idx_to_id[user_idx]=toks[1].strip()
user_idx+=1
try:
item_id_to_idx[toks[0].strip()]
except KeyError:
if toks[0].strip()!=None:
item_id_to_idx[toks[0].strip()]=item_idx
item_idx_to_id[item_idx]=toks[0].strip()
item_idx+=1
return user_idx_to_id,user_id_to_idx,item_idx_to_id,item_id_to_idx,user_idx,item_idx

def concat_helper(a,b):
if(a!= None and b!=None):
print a,b,a.update(b)
temp=dict()
temp.update(a)
temp.update(b)
return temp
elif a!=None:
return a
elif b!=None:
return b

# pass in the hdfs path to the input files and the spark context.
def runKNN(inputdir,sc,user_id_to_idx,item_id_to_idx):
rdd_text=sc.textFile(inputdir)
try:
new_rdd = rdd_text.map(lambda x: (item_id_to_idx[str(x.strip().split(\t)[0])],{user_id_to_idx[str(x.strip().split(\t)[1])]:1})).reduceByKey(lambda x,y: concat_helper(x,y)).sortByKey()
except KeyError:
print item_id_to_idx.keys()
pass
return new_rdd

if __name__==__main__:
sc = SparkContext()
u_idx_to_id,u_id_to_idx,i_idx_to_id,i_id_to_idx,u_idx,i_idx=create_indices(sys.argv[1])

u_idx_to_id_b=sc.broadcast(u_idx_to_id)
u_id_to_idx_b=sc.broadcast(u_id_to_idx)
i_idx_to_idx_b=sc.broadcast(i_idx_to_id)
i_id_to_idx_b=sc.broadcast(i_id_to_idx)
num_users=sc.broadcast(u_idx)
num_items=sc.broadcast(i_idx)
item_dict_rdd=runKNN(sys.argv[1],sc,u_id_to_idx,i_id_to_idx)

item_dict_rdd_new=item_dict_rdd.map(lambda x: (x[0],SparseVector(i_idx,x[1])))
item_dict_rdd_new.saveAsTextFile(hdfs://output_path)
#dot_products_rdd=map(lambda (x,y): (x,y),combinations(item_dict_rdd_new.map(lambda x: x),2))
dot_products_rdd.saveAsTextFile(hdfs://output_path_2)

stacktrace
Description: stacktrace

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: Query data in Spark RRD

2015-02-23 Thread Nikhil Bafna
Tathagata - Yes, I'm thinking on that line.

The problem is how to send to send the query to the backend? Bundle a http
server into a spark streaming job, that will accept the parameters?

--
Nikhil Bafna

On Mon, Feb 23, 2015 at 2:04 PM, Tathagata Das t...@databricks.com wrote:

 You will have a build a split infrastructure - a front end that takes the
 queries from the UI and sends them to the backend, and the backend (running
 the Spark Streaming app) will actually run the queries on table created in
 the contexts. The RPCs necessary between the frontend and backend will need
 to be implemented by you.

 On Sat, Feb 21, 2015 at 11:57 PM, Nikhil Bafna nikhil.ba...@flipkart.com
 wrote:


 Yes. As my understanding, it would allow me to write SQLs to query a
 spark context. But, the query needs to be specified within a job  deployed.

 What I want is to be able to run multiple dynamic queries specified at
 runtime from a dashboard.



 --
 Nikhil Bafna

 On Sat, Feb 21, 2015 at 8:37 PM, Ted Yu yuzhih...@gmail.com wrote:

 Have you looked at
 http://spark.apache.org/docs/1.2.0/api/scala/index.html#org.apache.spark.sql.SchemaRDD
 ?

 Cheers

 On Sat, Feb 21, 2015 at 4:24 AM, Nikhil Bafna nikhil.ba...@flipkart.com
  wrote:


 Hi.

 My use case is building a realtime monitoring system over
 multi-dimensional data.

 The way I'm planning to go about it is to use Spark Streaming to store
 aggregated count over all dimensions in 10 sec interval.

 Then, from a dashboard, I would be able to specify a query over some
 dimensions, which will need re-aggregation from the already computed job.

 My query is, how can I run dynamic queries over data in schema RDDs?

 --
 Nikhil Bafna







Re: Query data in Spark RRD

2015-02-21 Thread Nikhil Bafna
Yes. As my understanding, it would allow me to write SQLs to query a spark
context. But, the query needs to be specified within a job  deployed.

What I want is to be able to run multiple dynamic queries specified at
runtime from a dashboard.



--
Nikhil Bafna

On Sat, Feb 21, 2015 at 8:37 PM, Ted Yu yuzhih...@gmail.com wrote:

 Have you looked at
 http://spark.apache.org/docs/1.2.0/api/scala/index.html#org.apache.spark.sql.SchemaRDD
 ?

 Cheers

 On Sat, Feb 21, 2015 at 4:24 AM, Nikhil Bafna nikhil.ba...@flipkart.com
 wrote:


 Hi.

 My use case is building a realtime monitoring system over
 multi-dimensional data.

 The way I'm planning to go about it is to use Spark Streaming to store
 aggregated count over all dimensions in 10 sec interval.

 Then, from a dashboard, I would be able to specify a query over some
 dimensions, which will need re-aggregation from the already computed job.

 My query is, how can I run dynamic queries over data in schema RDDs?

 --
 Nikhil Bafna





Query data in Spark RRD

2015-02-21 Thread Nikhil Bafna
Hi.

My use case is building a realtime monitoring system over multi-dimensional
data.

The way I'm planning to go about it is to use Spark Streaming to store
aggregated count over all dimensions in 10 sec interval.

Then, from a dashboard, I would be able to specify a query over some
dimensions, which will need re-aggregation from the already computed job.

My query is, how can I run dynamic queries over data in schema RDDs?

--
Nikhil Bafna


Re: How to Integrate openNLP with Spark

2014-12-04 Thread Nikhil
Did anyone get a chance to look at this?

Please provide some help.

Thanks
Nikhil



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-Integrate-openNLP-with-Spark-tp20117p20368.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to Integrate openNLP with Spark

2014-12-01 Thread Nikhil
Hi,

I am using openNLP NER ( Token Name finder ) for parsing an Unstructured
data. In order to speed up my process( to quickly train a models and analyze
the documents from the models ), I want to use Spark and I saw on the web
that it is possible to connect openNLP with Spark using UIMAFit but I am not
sure how to do so. Though Philip Ogren has given a very nice presentation in
Spark Summit, still I am confusing.

Can someone please provide me end to end example on this. I am new in Spark
and UIMAFit, recently started working on it. 

Thanks

Nikhil Jain



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-Integrate-openNLP-with-Spark-tp20117.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org