Is there any Job/Career channel

2023-01-15 Thread Chetan Khatri
Hi Spark Users,

Is there any Job/Career channel for Apache Spark?

Thank you


to find Difference of locations in Spark Dataframe rows

2022-06-07 Thread Chetan Khatri
Hi Dear Spark Users,

It has been many years that I have worked on Spark, Please help me. Thanks
much

I have different cities and their co-ordinates in DataFrame[Row], I want to
find distance in KMs and then show only those records /cities which are 10
KMs far.

I have a function created that can find the distance in KMs given two
co-coordinates. But I don't know how to apply it to rows, like one to many
and calculate the distance.

Some code that I wrote, Sorry for the basic code.

lass HouseMatching {
  def main(args: Array[String]): Unit = {

val search_property_id = args(0)

// list of columns where the condition should be exact match
val groupOneCriteria = List(
  "occupied_by_tenant",
  "water_index",
  "electricity_index",
  "elevator_index",
  "heating_index",
  "nb_bathtubs",
  "nb_showers",
  "nb_wc",
  "nb_rooms",
  "nb_kitchens"
)
// list of columns where the condition should be matching 80%
val groupTwoCriteria = List(
  "area",
  "home_condition",
  "building_age"
)
// list of columns where the condition should be found using
Euclidean distance
val groupThreeCriteria = List(
  "postal_code"
)

val region_or_city = "region"

def haversineDistance(destination_latitude: Column,
destination_longitude: Column, origin_latitude: Column,
  origin_longitude: Column): Column = {
  val a = pow(sin(radians(destination_latitude - origin_latitude) / 2), 2) +
cos(radians(origin_latitude)) * cos(radians(destination_latitude)) *
  pow(sin(radians(destination_longitude - origin_longitude) / 2), 2)
  val distance = atan2(sqrt(a), sqrt(-a + 1)) * 2 * 6371
  distance
}

val spark = SparkSession.builder().appName("real-estate-property-matcher")
  .getOrCreate()

val housingDataDF =
spark.read.csv("~/Downloads/real-estate-sample-data.csv")

// searching for the property by `ref_id`
val searchPropertyDF = housingDataDF.filter(col("ref_id") ===
search_property_id)

// Similar house in the same city (same postal code) and group one condition
val similarHouseAndSameCity = housingDataDF.join(searchPropertyDF,
groupThreeCriteria ++ groupOneCriteria,
  "inner")

// Similar house not in the same city but 10km range


Need help on migrating Spark on Hortonworks to Kubernetes Cluster

2022-05-08 Thread Chetan Khatri
Hi Everyone, I need help on my Airflow DAG which has Spark Submit and Now I
have Kubernetes Cluster instead Hortonworks Linux Distributed Spark Cluster.My
existing Spark-Submit is through BashOperator as below:

calculation1 = '/usr/hdp/2.6.5.0-292/spark2/bin/spark-submit  --conf
spark.yarn.maxAppAttempts=1  --conf
spark.dynamicAllocation.executorAllocationRatio=1  --conf
spark.executor.heartbeatInterval=30s  --conf
spark.dynamicAllocation.executorIdleTimeout=60s  --conf
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout=15s --conf
spark.network.timeout=800s  --conf
spark.dynamicAllocation.schedulerBacklogTimeout=15s --conf
spark.shuffle.service.enabled=true --conf
spark.dynamicAllocation.enabled=true --conf
spark.dynamicAllocation.minExecutors=4 --conf
spark.dynamicAllocation.initialExecutors=4 --conf
spark.dynamicAllocation.maxExecutors=8  --conf
"spark.driver.extraJavaOptions=-Djava.util.logging.config.file=/opt/airflow/dags/logging.properties"
 --executor-cores 4  --executor-memory 8g --driver-memory 12g --master
yarn --class com.wkelms.phoenix.incremental.invoice.Calculations
/opt/airflow/dags/nextgen-phoenix-incremental-assembly-0.1.jar 1
"Incremental" "/opt/airflow/dags/load_batch_configuration.json"'
tCalculateBatch1 = BashOperator(
task_id="calculate_batch_1",
dag=dag,
trigger_rule="all_success",
bash_command=calculation1,
)

But Now I have Kubernetes Cluster and SparkMaster, SparkWorker, and Airflow
are pods, so How it should be written/designed?from airflow-scheduler how
can I submit the Spark Job on spark-worker?
*Kubernetes Pods are as below*

[root@spark-phoenix ~]# kubectl get pods -A
NAMESPACE NAME  READY   STATUS
 RESTARTS   AGE
kube-system   helm-install-traefik-crd-dn82j0/1
Completed   0  37d
kube-system   helm-install-traefik-vrcz80/1
Completed   1  37d
kube-system   local-path-provisioner-5ff76fc89d-mrgzd   1/1
Running 16 37d
kube-system   coredns-7448499f4d-92xhx  1/1
Running 11 37d
airflow   airflow-statsd-7586f9998-j29h71/1
Running 1  2d10h
kube-system   metrics-server-86cbb8457f-q9tt2   1/1
Running 11 37d
kube-system   svclb-traefik-vt9xw   2/2
Running 22 37d
airflow   airflow-postgresql-0  1/1
Running 1  2d10h
kube-system   traefik-6b84f7cbc-csffr   1/1
Running 11 37d
spark spark-worker-01/1
Running 11 37d
spark spark-master-01/1
Running 11 37d
spark spark-worker-11/1
Running 11 37d
airflow   airflow-triggerer-6cc8c54495-w4jzz1/1
Running 1  2d10h
airflow   airflow-scheduler-7694ccf55-5r9kw 2/2
Running 2  2d10h
airflow   airflow-webserver-68655785c7-lmgzg1/1
Running 0  21h


Re: Usage of DropDuplicate in Spark

2021-06-22 Thread Chetan Khatri
I am looking for any built-in API if at all exists?

On Tue, Jun 22, 2021 at 1:16 PM Chetan Khatri 
wrote:

> this has been very slow
>
> On Tue, Jun 22, 2021 at 1:15 PM Sachit Murarka 
> wrote:
>
>> Hi Chetan,
>>
>> You can substract the data frame or use except operation.
>> First DF contains full rows.
>> Second DF contains unique rows (post remove duplicates)
>> Subtract first and second DF .
>>
>> hope this helps
>>
>> Thanks
>> Sachit
>>
>> On Tue, Jun 22, 2021, 22:23 Chetan Khatri 
>> wrote:
>>
>>> Hi Spark Users,
>>>
>>> I want to use DropDuplicate, but those records which I discard. I
>>> would like to log to the instrumental table.
>>>
>>> What would be the best approach to do that?
>>>
>>> Thanks
>>>
>>


Re: Usage of DropDuplicate in Spark

2021-06-22 Thread Chetan Khatri
this has been very slow

On Tue, Jun 22, 2021 at 1:15 PM Sachit Murarka 
wrote:

> Hi Chetan,
>
> You can substract the data frame or use except operation.
> First DF contains full rows.
> Second DF contains unique rows (post remove duplicates)
> Subtract first and second DF .
>
> hope this helps
>
> Thanks
> Sachit
>
> On Tue, Jun 22, 2021, 22:23 Chetan Khatri 
> wrote:
>
>> Hi Spark Users,
>>
>> I want to use DropDuplicate, but those records which I discard. I
>> would like to log to the instrumental table.
>>
>> What would be the best approach to do that?
>>
>> Thanks
>>
>


Usage of DropDuplicate in Spark

2021-06-22 Thread Chetan Khatri
Hi Spark Users,

I want to use DropDuplicate, but those records which I discard. I
would like to log to the instrumental table.

What would be the best approach to do that?

Thanks


Re: Performance Improvement: Collect in spark taking huge time

2021-05-05 Thread Chetan Khatri
Hi All,

Do you think, replacing the collect() (for having scala collection for
loop) with below codeblock will have any benefit?

cachedColumnsAddTableDF.select("reporting_table").distinct().foreach(r => {
  r.getAs("reporting_table").asInstanceOf[String]
})


On Wed, May 5, 2021 at 10:15 PM Chetan Khatri 
wrote:

> Hi All, Collect in spark is taking huge time. I want to get list of values
> of one column to Scala collection. How can I do this?
>  val newDynamicFieldTablesDF = cachedPhoenixAppMetaDataForCreateTableDF
> .select(col("reporting_table")).except(clientSchemaDF)
>   logger.info(s"### except with client-schema done " +
> LocalDateTime.now())
>   // newDynamicFieldTablesDF.cache()
>
>
> val detailsForCreateTableDF =
> cachedPhoenixAppMetaDataForCreateTableDF
>   .join(broadcast(newDynamicFieldTablesDF),
> Seq("reporting_table"), "inner")
> logger.info(s"### join with newDF done " +
> LocalDateTime.now())
>
> //detailsForCreateTableDF.cache()
>
> val newDynamicFieldTablesList = newDynamicFieldTablesDF.map(r
> => r.getString(0)).collect().toSet
>
>
> Later, I am iterating this list for one the use case to create a custom
> definition table:
>
> newDynamicFieldTablesList.foreach(table => { // running here Create table
> DDL/SQL query })
>
> Thank you so much
>


Performance Improvement: Collect in spark taking huge time

2021-05-05 Thread Chetan Khatri
Hi All, Collect in spark is taking huge time. I want to get list of values
of one column to Scala collection. How can I do this?
 val newDynamicFieldTablesDF = cachedPhoenixAppMetaDataForCreateTableDF
.select(col("reporting_table")).except(clientSchemaDF)
  logger.info(s"### except with client-schema done " +
LocalDateTime.now())
  // newDynamicFieldTablesDF.cache()


val detailsForCreateTableDF =
cachedPhoenixAppMetaDataForCreateTableDF
  .join(broadcast(newDynamicFieldTablesDF),
Seq("reporting_table"), "inner")
logger.info(s"### join with newDF done " +
LocalDateTime.now())

//detailsForCreateTableDF.cache()

val newDynamicFieldTablesList = newDynamicFieldTablesDF.map(r
=> r.getString(0)).collect().toSet


Later, I am iterating this list for one the use case to create a custom
definition table:

newDynamicFieldTablesList.foreach(table => { // running here Create table
DDL/SQL query })

Thank you so much


DropNa in Spark for Columns

2021-02-26 Thread Chetan Khatri
Hi Users,

What is equivalent of *df.dropna(axis='columns'**) *of Pandas in the
Spark/Scala?

Thanks


Re: Reading TB of JSON file

2020-06-19 Thread Chetan Khatri
Thanks, you meant in a for loop. could you please put pseudocode in spark

On Fri, Jun 19, 2020 at 8:39 AM Jörn Franke  wrote:

> Make every json object a line and then read t as jsonline not as multiline
>
> Am 19.06.2020 um 14:37 schrieb Chetan Khatri  >:
>
> 
> All transactions in JSON, It is not a single array.
>
> On Thu, Jun 18, 2020 at 12:55 PM Stephan Wehner 
> wrote:
>
>> It's an interesting problem. What is the structure of the file? One big
>> array? On hash with many key-value pairs?
>>
>> Stephan
>>
>> On Thu, Jun 18, 2020 at 6:12 AM Chetan Khatri <
>> chetan.opensou...@gmail.com> wrote:
>>
>>> Hi Spark Users,
>>>
>>> I have a 50GB of JSON file, I would like to read and persist at HDFS so
>>> it can be taken into next transformation. I am trying to read as
>>> spark.read.json(path) but this is giving Out of memory error on driver.
>>> Obviously, I can't afford having 50 GB on driver memory. In general, what
>>> is the best practice to read large JSON file like 50 GB?
>>>
>>> Thanks
>>>
>>
>>
>> --
>> Stephan Wehner, Ph.D.
>> The Buckmaster Institute, Inc.
>> 2150 Adanac Street
>> Vancouver BC V5L 2E7
>> Canada
>> Cell (604) 767-7415
>> Fax (888) 808-4655
>>
>> Sign up for our free email course
>> http://buckmaster.ca/small_business_website_mistakes.html
>>
>> http://www.buckmaster.ca
>> http://answer4img.com
>> http://loggingit.com
>> http://clocklist.com
>> http://stephansmap.org
>> http://benchology.com
>> http://www.trafficlife.com
>> http://stephan.sugarmotor.org (Personal Blog)
>> @stephanwehner (Personal Account)
>> VA7WSK (Personal call sign)
>>
>


Re: Reading TB of JSON file

2020-06-19 Thread Chetan Khatri
All transactions in JSON, It is not a single array.

On Thu, Jun 18, 2020 at 12:55 PM Stephan Wehner 
wrote:

> It's an interesting problem. What is the structure of the file? One big
> array? On hash with many key-value pairs?
>
> Stephan
>
> On Thu, Jun 18, 2020 at 6:12 AM Chetan Khatri 
> wrote:
>
>> Hi Spark Users,
>>
>> I have a 50GB of JSON file, I would like to read and persist at HDFS so
>> it can be taken into next transformation. I am trying to read as
>> spark.read.json(path) but this is giving Out of memory error on driver.
>> Obviously, I can't afford having 50 GB on driver memory. In general, what
>> is the best practice to read large JSON file like 50 GB?
>>
>> Thanks
>>
>
>
> --
> Stephan Wehner, Ph.D.
> The Buckmaster Institute, Inc.
> 2150 Adanac Street
> Vancouver BC V5L 2E7
> Canada
> Cell (604) 767-7415
> Fax (888) 808-4655
>
> Sign up for our free email course
> http://buckmaster.ca/small_business_website_mistakes.html
>
> http://www.buckmaster.ca
> http://answer4img.com
> http://loggingit.com
> http://clocklist.com
> http://stephansmap.org
> http://benchology.com
> http://www.trafficlife.com
> http://stephan.sugarmotor.org (Personal Blog)
> @stephanwehner (Personal Account)
> VA7WSK (Personal call sign)
>


Re: Reading TB of JSON file

2020-06-19 Thread Chetan Khatri
Yes

On Thu, Jun 18, 2020 at 12:34 PM Gourav Sengupta 
wrote:

> Hi,
> So you have a single JSON record in multiple lines?
> And all the 50 GB is in one file?
>
> Regards,
> Gourav
>
> On Thu, 18 Jun 2020, 14:34 Chetan Khatri, 
> wrote:
>
>> It is dynamically generated and written at s3 bucket not historical data
>> so I guess it doesn't have jsonlines format
>>
>> On Thu, Jun 18, 2020 at 9:16 AM Jörn Franke  wrote:
>>
>>> Depends on the data types you use.
>>>
>>> Do you have in jsonlines format? Then the amount of memory plays much
>>> less a role.
>>>
>>> Otherwise if it is one large object or array I would not recommend it.
>>>
>>> > Am 18.06.2020 um 15:12 schrieb Chetan Khatri <
>>> chetan.opensou...@gmail.com>:
>>> >
>>> > 
>>> > Hi Spark Users,
>>> >
>>> > I have a 50GB of JSON file, I would like to read and persist at HDFS
>>> so it can be taken into next transformation. I am trying to read as
>>> spark.read.json(path) but this is giving Out of memory error on driver.
>>> Obviously, I can't afford having 50 GB on driver memory. In general, what
>>> is the best practice to read large JSON file like 50 GB?
>>> >
>>> > Thanks
>>>
>>


Re: Reading TB of JSON file

2020-06-18 Thread Chetan Khatri
It is dynamically generated and written at s3 bucket not historical data so
I guess it doesn't have jsonlines format

On Thu, Jun 18, 2020 at 9:16 AM Jörn Franke  wrote:

> Depends on the data types you use.
>
> Do you have in jsonlines format? Then the amount of memory plays much less
> a role.
>
> Otherwise if it is one large object or array I would not recommend it.
>
> > Am 18.06.2020 um 15:12 schrieb Chetan Khatri <
> chetan.opensou...@gmail.com>:
> >
> > 
> > Hi Spark Users,
> >
> > I have a 50GB of JSON file, I would like to read and persist at HDFS so
> it can be taken into next transformation. I am trying to read as
> spark.read.json(path) but this is giving Out of memory error on driver.
> Obviously, I can't afford having 50 GB on driver memory. In general, what
> is the best practice to read large JSON file like 50 GB?
> >
> > Thanks
>


Re: Reading TB of JSON file

2020-06-18 Thread Chetan Khatri
File is available at S3 Bucket.


On Thu, Jun 18, 2020 at 9:15 AM Patrick McCarthy 
wrote:

> Assuming that the file can be easily split, I would divide it into a
> number of pieces and move those pieces to HDFS before using spark at all,
> using `hdfs dfs` or similar. At that point you can use your executors to
> perform the reading instead of the driver.
>
> On Thu, Jun 18, 2020 at 9:12 AM Chetan Khatri 
> wrote:
>
>> Hi Spark Users,
>>
>> I have a 50GB of JSON file, I would like to read and persist at HDFS so
>> it can be taken into next transformation. I am trying to read as
>> spark.read.json(path) but this is giving Out of memory error on driver.
>> Obviously, I can't afford having 50 GB on driver memory. In general, what
>> is the best practice to read large JSON file like 50 GB?
>>
>> Thanks
>>
>
>
> --
>
>
> *Patrick McCarthy  *
>
> Senior Data Scientist, Machine Learning Engineering
>
> Dstillery
>
> 470 Park Ave South, 17th Floor, NYC 10016
>


Reading TB of JSON file

2020-06-18 Thread Chetan Khatri
Hi Spark Users,

I have a 50GB of JSON file, I would like to read and persist at HDFS so it
can be taken into next transformation. I am trying to read as
spark.read.json(path) but this is giving Out of memory error on driver.
Obviously, I can't afford having 50 GB on driver memory. In general, what
is the best practice to read large JSON file like 50 GB?

Thanks


Join on Condition provide at run time

2020-06-02 Thread Chetan Khatri
Hi Spark Users,

How can I provide join ON condition at run time in the form of String to
the code? Can someone please help me.


Re: Calling HTTP Rest APIs from Spark Job

2020-05-15 Thread Chetan Khatri
Hi Sean,
Thanks for great answer.

What I am trying to do is to use something like Scala Future (cats-effect
IO) to do concurrent calls. Was understanding if any limitation
thresholds to make those calls.

On Thu, May 14, 2020 at 7:28 PM Sean Owen  wrote:

> No, it means # HTTP calls = # executor slots. But even then, you're
> welcome to, say, use thread pools to execute even more concurrently as
> most are I/O bound. Your code can do what you want.
>
> On Thu, May 14, 2020 at 6:14 PM Chetan Khatri
>  wrote:
> >
> > Thanks, that means number of executor = number of http calls, I can
> make. I can't boost more number of http calls in single executors, I mean -
> I can't go beyond the threashold of number of executors.
> >
> > On Thu, May 14, 2020 at 6:26 PM Sean Owen  wrote:
> >>
> >> Default is not 200, but the number of executor slots. Yes you can only
> simultaneously execute as many tasks as slots regardless of partitions.
> >>
> >> On Thu, May 14, 2020, 5:19 PM Chetan Khatri <
> chetan.opensou...@gmail.com> wrote:
> >>>
> >>> Thanks Sean, Jerry.
> >>>
> >>> Default Spark DataFrame partitions are 200 right? does it have
> relationship with number of cores? 8 cores - 4 workers. is not it like I
> can do only 8 * 4 = 32 http calls. Because in Spark number of partitions =
> number cores is untrue.
> >>>
> >>> Thanks
> >>>
> >>> On Thu, May 14, 2020 at 6:11 PM Sean Owen  wrote:
> >>>>
> >>>> Yes any code that you write in code that you apply with Spark runs in
> >>>> the executors. You would be running as many HTTP clients as you have
> >>>> partitions.
> >>>>
> >>>> On Thu, May 14, 2020 at 4:31 PM Jerry Vinokurov <
> grapesmo...@gmail.com> wrote:
> >>>> >
> >>>> > I believe that if you do this within the context of an operation
> that is already parallelized such as a map, the work will be distributed to
> executors and they will do it in parallel. I could be wrong about this as I
> never investigated this specific use case, though.
> >>>> >
> >>>> > On Thu, May 14, 2020 at 5:24 PM Chetan Khatri <
> chetan.opensou...@gmail.com> wrote:
> >>>> >>
> >>>> >> Thanks for the quick response.
> >>>> >>
> >>>> >> I am curious to know whether would it be parallel pulling data for
> 100+ HTTP request or it will only go on Driver node? the post body would be
> part of DataFrame. Think as I have a data frame of employee_id,
> employee_name now the http GET call has to be made for each employee_id and
> DataFrame is dynamic for each spark job run.
> >>>> >>
> >>>> >> Does it make sense?
> >>>> >>
> >>>> >> Thanks
> >>>> >>
> >>>> >>
> >>>> >> On Thu, May 14, 2020 at 5:12 PM Jerry Vinokurov <
> grapesmo...@gmail.com> wrote:
> >>>> >>>
> >>>> >>> Hi Chetan,
> >>>> >>>
> >>>> >>> You can pretty much use any client to do this. When I was using
> Spark at a previous job, we used OkHttp, but I'm sure there are plenty of
> others. In our case, we had a startup phase in which we gathered metadata
> via a REST API and then broadcast it to the workers. I think if you need
> all the workers to have access to whatever you're getting from the API,
> that's the way to do it.
> >>>> >>>
> >>>> >>> Jerry
> >>>> >>>
> >>>> >>> On Thu, May 14, 2020 at 5:03 PM Chetan Khatri <
> chetan.opensou...@gmail.com> wrote:
> >>>> >>>>
> >>>> >>>> Hi Spark Users,
> >>>> >>>>
> >>>> >>>> How can I invoke the Rest API call from Spark Code which is not
> only running on Spark Driver but distributed / parallel?
> >>>> >>>>
> >>>> >>>> Spark with Scala is my tech stack.
> >>>> >>>>
> >>>> >>>> Thanks
> >>>> >>>>
> >>>> >>>>
> >>>> >>>
> >>>> >>>
> >>>> >>> --
> >>>> >>> http://www.google.com/profiles/grapesmoker
> >>>> >
> >>>> >
> >>>> >
> >>>> > --
> >>>> > http://www.google.com/profiles/grapesmoker
>


Re: Calling HTTP Rest APIs from Spark Job

2020-05-14 Thread Chetan Khatri
Thanks, that means number of executor = number of http calls, I can make. I
can't boost more number of http calls in single executors, I mean - I can't
go beyond the threashold of number of executors.

On Thu, May 14, 2020 at 6:26 PM Sean Owen  wrote:

> Default is not 200, but the number of executor slots. Yes you can only
> simultaneously execute as many tasks as slots regardless of partitions.
>
> On Thu, May 14, 2020, 5:19 PM Chetan Khatri 
> wrote:
>
>> Thanks Sean, Jerry.
>>
>> Default Spark DataFrame partitions are 200 right? does it have
>> relationship with number of cores? 8 cores - 4 workers. is not it like I
>> can do only 8 * 4 = 32 http calls. Because in Spark number of partitions =
>> number cores is untrue.
>>
>> Thanks
>>
>> On Thu, May 14, 2020 at 6:11 PM Sean Owen  wrote:
>>
>>> Yes any code that you write in code that you apply with Spark runs in
>>> the executors. You would be running as many HTTP clients as you have
>>> partitions.
>>>
>>> On Thu, May 14, 2020 at 4:31 PM Jerry Vinokurov 
>>> wrote:
>>> >
>>> > I believe that if you do this within the context of an operation that
>>> is already parallelized such as a map, the work will be distributed to
>>> executors and they will do it in parallel. I could be wrong about this as I
>>> never investigated this specific use case, though.
>>> >
>>> > On Thu, May 14, 2020 at 5:24 PM Chetan Khatri <
>>> chetan.opensou...@gmail.com> wrote:
>>> >>
>>> >> Thanks for the quick response.
>>> >>
>>> >> I am curious to know whether would it be parallel pulling data for
>>> 100+ HTTP request or it will only go on Driver node? the post body would be
>>> part of DataFrame. Think as I have a data frame of employee_id,
>>> employee_name now the http GET call has to be made for each employee_id and
>>> DataFrame is dynamic for each spark job run.
>>> >>
>>> >> Does it make sense?
>>> >>
>>> >> Thanks
>>> >>
>>> >>
>>> >> On Thu, May 14, 2020 at 5:12 PM Jerry Vinokurov <
>>> grapesmo...@gmail.com> wrote:
>>> >>>
>>> >>> Hi Chetan,
>>> >>>
>>> >>> You can pretty much use any client to do this. When I was using
>>> Spark at a previous job, we used OkHttp, but I'm sure there are plenty of
>>> others. In our case, we had a startup phase in which we gathered metadata
>>> via a REST API and then broadcast it to the workers. I think if you need
>>> all the workers to have access to whatever you're getting from the API,
>>> that's the way to do it.
>>> >>>
>>> >>> Jerry
>>> >>>
>>> >>> On Thu, May 14, 2020 at 5:03 PM Chetan Khatri <
>>> chetan.opensou...@gmail.com> wrote:
>>> >>>>
>>> >>>> Hi Spark Users,
>>> >>>>
>>> >>>> How can I invoke the Rest API call from Spark Code which is not
>>> only running on Spark Driver but distributed / parallel?
>>> >>>>
>>> >>>> Spark with Scala is my tech stack.
>>> >>>>
>>> >>>> Thanks
>>> >>>>
>>> >>>>
>>> >>>
>>> >>>
>>> >>> --
>>> >>> http://www.google.com/profiles/grapesmoker
>>> >
>>> >
>>> >
>>> > --
>>> > http://www.google.com/profiles/grapesmoker
>>>
>>


Re: Calling HTTP Rest APIs from Spark Job

2020-05-14 Thread Chetan Khatri
Thanks Sean, Jerry.

Default Spark DataFrame partitions are 200 right? does it have relationship
with number of cores? 8 cores - 4 workers. is not it like I can do only 8 *
4 = 32 http calls. Because in Spark number of partitions = number cores is
untrue.

Thanks

On Thu, May 14, 2020 at 6:11 PM Sean Owen  wrote:

> Yes any code that you write in code that you apply with Spark runs in
> the executors. You would be running as many HTTP clients as you have
> partitions.
>
> On Thu, May 14, 2020 at 4:31 PM Jerry Vinokurov 
> wrote:
> >
> > I believe that if you do this within the context of an operation that is
> already parallelized such as a map, the work will be distributed to
> executors and they will do it in parallel. I could be wrong about this as I
> never investigated this specific use case, though.
> >
> > On Thu, May 14, 2020 at 5:24 PM Chetan Khatri <
> chetan.opensou...@gmail.com> wrote:
> >>
> >> Thanks for the quick response.
> >>
> >> I am curious to know whether would it be parallel pulling data for 100+
> HTTP request or it will only go on Driver node? the post body would be part
> of DataFrame. Think as I have a data frame of employee_id, employee_name
> now the http GET call has to be made for each employee_id and DataFrame is
> dynamic for each spark job run.
> >>
> >> Does it make sense?
> >>
> >> Thanks
> >>
> >>
> >> On Thu, May 14, 2020 at 5:12 PM Jerry Vinokurov 
> wrote:
> >>>
> >>> Hi Chetan,
> >>>
> >>> You can pretty much use any client to do this. When I was using Spark
> at a previous job, we used OkHttp, but I'm sure there are plenty of others.
> In our case, we had a startup phase in which we gathered metadata via a
> REST API and then broadcast it to the workers. I think if you need all the
> workers to have access to whatever you're getting from the API, that's the
> way to do it.
> >>>
> >>> Jerry
> >>>
> >>> On Thu, May 14, 2020 at 5:03 PM Chetan Khatri <
> chetan.opensou...@gmail.com> wrote:
> >>>>
> >>>> Hi Spark Users,
> >>>>
> >>>> How can I invoke the Rest API call from Spark Code which is not only
> running on Spark Driver but distributed / parallel?
> >>>>
> >>>> Spark with Scala is my tech stack.
> >>>>
> >>>> Thanks
> >>>>
> >>>>
> >>>
> >>>
> >>> --
> >>> http://www.google.com/profiles/grapesmoker
> >
> >
> >
> > --
> > http://www.google.com/profiles/grapesmoker
>


Re: Calling HTTP Rest APIs from Spark Job

2020-05-14 Thread Chetan Khatri
Thanks for the quick response.

I am curious to know whether would it be parallel pulling data for 100+
HTTP request or it will only go on Driver node? the post body would be part
of DataFrame. Think as I have a data frame of employee_id, employee_name
now the http GET call has to be made for each employee_id and DataFrame is
dynamic for each spark job run.

Does it make sense?

Thanks


On Thu, May 14, 2020 at 5:12 PM Jerry Vinokurov 
wrote:

> Hi Chetan,
>
> You can pretty much use any client to do this. When I was using Spark at a
> previous job, we used OkHttp, but I'm sure there are plenty of others. In
> our case, we had a startup phase in which we gathered metadata via a REST
> API and then broadcast it to the workers. I think if you need all the
> workers to have access to whatever you're getting from the API, that's the
> way to do it.
>
> Jerry
>
> On Thu, May 14, 2020 at 5:03 PM Chetan Khatri 
> wrote:
>
>> Hi Spark Users,
>>
>> How can I invoke the Rest API call from Spark Code which is not only
>> running on Spark Driver but distributed / parallel?
>>
>> Spark with Scala is my tech stack.
>>
>> Thanks
>>
>>
>>
>
> --
> http://www.google.com/profiles/grapesmoker
>


Calling HTTP Rest APIs from Spark Job

2020-05-14 Thread Chetan Khatri
Hi Spark Users,

How can I invoke the Rest API call from Spark Code which is not only
running on Spark Driver but distributed / parallel?

Spark with Scala is my tech stack.

Thanks


Re: XPATH_INT behavior - XML - Function in Spark

2020-05-13 Thread Chetan Khatri
Anyone can please suggest how can I achieve this?

On Tue, May 12, 2020 at 5:35 PM Jeff Evans 
wrote:

> It sounds like you're expecting the XPath expression to evaluate embedded
> Spark SQL expressions?  From the documentation
> <https://spark.apache.org/docs/2.4.5/api/sql/index.html#xpath>, there
> appears to be no reason to expect that to work.
>
> On Tue, May 12, 2020 at 2:09 PM Chetan Khatri 
> wrote:
>
>> Can someone please help.. Thanks in advance.
>>
>> On Mon, May 11, 2020 at 5:29 PM Chetan Khatri <
>> chetan.opensou...@gmail.com> wrote:
>>
>>> Hi Spark Users,
>>>
>>> I want to parse xml coming in the query columns and get the value, I am
>>> using *xpath_int* which works as per my requirement but When I am
>>> embedding in the Spark SQL query columns it is failing.
>>>
>>> select timesheet_profile_id,
>>> *xpath_int(timesheet_profile_code, '(/timesheetprofile/weeks/week[*
>>> *td.current_week**]/**td.day**)[1]')*
>>>
>>> *this failed *
>>> where Hardcoded values work for the above scenario
>>>
>>> scala> spark.sql("select timesheet_profile_id,
>>> xpath_int(timesheet_profile_code,
>>> '(/timesheetprofile/weeks/week[2]/friday)[1]') from
>>> TIMESHEET_PROFILE_ATT").show(false)
>>>
>>> Anyone has worked on this? Thanks in advance.
>>>
>>> Thanks
>>> - Chetan
>>>
>>>


Re: XPATH_INT behavior - XML - Function in Spark

2020-05-12 Thread Chetan Khatri
Thank you for the clarification.
What do you suggest to get this use case achieved.

On Tue, May 12, 2020 at 5:35 PM Jeff Evans 
wrote:

> It sounds like you're expecting the XPath expression to evaluate embedded
> Spark SQL expressions?  From the documentation
> <https://spark.apache.org/docs/2.4.5/api/sql/index.html#xpath>, there
> appears to be no reason to expect that to work.
>
> On Tue, May 12, 2020 at 2:09 PM Chetan Khatri 
> wrote:
>
>> Can someone please help.. Thanks in advance.
>>
>> On Mon, May 11, 2020 at 5:29 PM Chetan Khatri <
>> chetan.opensou...@gmail.com> wrote:
>>
>>> Hi Spark Users,
>>>
>>> I want to parse xml coming in the query columns and get the value, I am
>>> using *xpath_int* which works as per my requirement but When I am
>>> embedding in the Spark SQL query columns it is failing.
>>>
>>> select timesheet_profile_id,
>>> *xpath_int(timesheet_profile_code, '(/timesheetprofile/weeks/week[*
>>> *td.current_week**]/**td.day**)[1]')*
>>>
>>> *this failed *
>>> where Hardcoded values work for the above scenario
>>>
>>> scala> spark.sql("select timesheet_profile_id,
>>> xpath_int(timesheet_profile_code,
>>> '(/timesheetprofile/weeks/week[2]/friday)[1]') from
>>> TIMESHEET_PROFILE_ATT").show(false)
>>>
>>> Anyone has worked on this? Thanks in advance.
>>>
>>> Thanks
>>> - Chetan
>>>
>>>


Re: XPATH_INT behavior - XML - Function in Spark

2020-05-12 Thread Chetan Khatri
Can someone please help.. Thanks in advance.

On Mon, May 11, 2020 at 5:29 PM Chetan Khatri 
wrote:

> Hi Spark Users,
>
> I want to parse xml coming in the query columns and get the value, I am
> using *xpath_int* which works as per my requirement but When I am
> embedding in the Spark SQL query columns it is failing.
>
> select timesheet_profile_id,
> *xpath_int(timesheet_profile_code, '(/timesheetprofile/weeks/week[*
> *td.current_week**]/**td.day**)[1]')*
>
> *this failed *
> where Hardcoded values work for the above scenario
>
> scala> spark.sql("select timesheet_profile_id,
> xpath_int(timesheet_profile_code,
> '(/timesheetprofile/weeks/week[2]/friday)[1]') from
> TIMESHEET_PROFILE_ATT").show(false)
>
> Anyone has worked on this? Thanks in advance.
>
> Thanks
> - Chetan
>
>


XPATH_INT behavior - XML - Function in Spark

2020-05-11 Thread Chetan Khatri
Hi Spark Users,

I want to parse xml coming in the query columns and get the value, I am
using *xpath_int* which works as per my requirement but When I am embedding
in the Spark SQL query columns it is failing.

select timesheet_profile_id,
*xpath_int(timesheet_profile_code, '(/timesheetprofile/weeks/week[*
*td.current_week**]/**td.day**)[1]')*

*this failed *
where Hardcoded values work for the above scenario

scala> spark.sql("select timesheet_profile_id,
xpath_int(timesheet_profile_code,
'(/timesheetprofile/weeks/week[2]/friday)[1]') from
TIMESHEET_PROFILE_ATT").show(false)

Anyone has worked on this? Thanks in advance.

Thanks
- Chetan


Re: AnalysisException - Infer schema for the Parquet path

2020-05-11 Thread Chetan Khatri
Thanks Mich, Nilesh.
What is also working is create schema object and provide at .schema(X) in
spark.read. statement.

Thanks a lot.

On Sun, May 10, 2020 at 2:37 AM Nilesh Kuchekar 
wrote:

> Hi Chetan,
>
>   You can have a static parquet file created, and when you
> create a data frame you can pass the location of both the files, with
> option mergeSchema true. This will always fetch you a dataframe even if the
> original file is not present.
>
> Kuchekar, Nilesh
>
>
> On Sat, May 9, 2020 at 10:46 PM Mich Talebzadeh 
> wrote:
>
>> Have you tried catching error when you are creating a dataframe?
>>
>> import scala.util.{Try, Success, Failure}
>> val df = Try(spark.read.
>>  format("com.databricks.spark.xml").
>>option("rootTag", "hierarchy").
>>option("rowTag", "sms_request").
>>load("/tmp/broadcast.xml")) match {
>>case Success(df) => df
>>case Failure(exception) => throw new Exception("foo")
>>   }
>> HTH
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Sat, 9 May 2020 at 22:51, Chetan Khatri 
>> wrote:
>>
>>> Hi Spark Users,
>>>
>>> I've a spark job where I am reading the parquet path, and that parquet
>>> path data is generated by other systems, some of the parquet paths doesn't
>>> contains any data which is possible. is there a any way to read the parquet
>>> if no data found I can create a dummy dataframe and go ahead.
>>>
>>> One way is to check path exists like
>>>
>>>  val conf = spark.sparkContext.hadoopConfiguration
>>> val fs = org.apache.hadoop.fs.FileSystem.get(conf)
>>> val currentAreaExists = fs.exists(new
>>> org.apache.hadoop.fs.Path(consumableCurrentArea))
>>>
>>> But I don't want to check this for 300 parquets, just if data doesn't
>>> exist in the parquet path go with the dummy parquet / custom DataFrame
>>>
>>> AnalysisException: u'Unable to infer schema for Parquet. It must be
>>> specified manually.;'
>>>
>>> Thanks
>>>
>>


AnalysisException - Infer schema for the Parquet path

2020-05-09 Thread Chetan Khatri
Hi Spark Users,

I've a spark job where I am reading the parquet path, and that parquet path
data is generated by other systems, some of the parquet paths doesn't
contains any data which is possible. is there a any way to read the parquet
if no data found I can create a dummy dataframe and go ahead.

One way is to check path exists like

 val conf = spark.sparkContext.hadoopConfiguration
val fs = org.apache.hadoop.fs.FileSystem.get(conf)
val currentAreaExists = fs.exists(new
org.apache.hadoop.fs.Path(consumableCurrentArea))

But I don't want to check this for 300 parquets, just if data doesn't exist
in the parquet path go with the dummy parquet / custom DataFrame

AnalysisException: u'Unable to infer schema for Parquet. It must be
specified manually.;'

Thanks


Re: How can I add extra mounted disk to HDFS

2020-04-30 Thread Chetan Khatri
I've 3 disks now
disk1- already have data
disk2- newly added

I want to shift the data from disk1 to disk2, obviously both are datanodes.
Please suggest the steps for hot data node disk migration.

On Wed, Apr 29, 2020 at 2:38 AM JB Data31  wrote:

> Use Hadoop NFSv3 gateway to mount FS.
>
> @*JB*Δ <http://jbigdata.fr/jbigdata/index.html>
>
>
>
> Le mar. 28 avr. 2020 à 23:18, Chetan Khatri 
> a écrit :
>
>> Hi Spark Users,
>>
>> My spark job gave me an error No Space left on the device
>>
>


Re: Read Hive ACID Managed table in Spark

2020-04-28 Thread Chetan Khatri
Hi Amogh,

Thanks for your reply. I'll send you seperate email for the communication.

On Thu, Apr 9, 2020 at 1:11 PM amogh margoor  wrote:

> Sorry for the late reply.
> I can help you with getting started with
> https://github.com/qubole/spark-acid  to read Hive ACID tables. Feel free
> to drop me a mail or raise an issue here:
> https://github.com/qubole/spark-acid/issues
>
> Regards,
> Amogh
>
> On Tue, Mar 10, 2020 at 4:20 AM Chetan Khatri 
> wrote:
>
>> Hi Venkata,
>> Thanks for your reply. I am using HDP 2.6 and I don't think above will
>> work for me, Any other suggestions? Thanks
>>
>> On Thu, Mar 5, 2020 at 8:24 AM venkata naidu udamala <
>> vudamala.gyan...@gmail.com> wrote:
>>
>>> You can try using have warehouse connector
>>> https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.1.5/integrating-hive/content/hive_hivewarehouseconnector_for_handling_apache_spark_data.html
>>>
>>> On Thu, Mar 5, 2020, 6:51 AM Chetan Khatri 
>>> wrote:
>>>
>>>> Just followup, if anyone has worried on this before
>>>>
>>>> On Wed, Mar 4, 2020 at 12:09 PM Chetan Khatri <
>>>> chetan.opensou...@gmail.com> wrote:
>>>>
>>>>> Hi Spark Users,
>>>>> I want to read Hive ACID managed table data (ORC) in Spark. Can
>>>>> someone help me here.
>>>>> I've tried, https://github.com/qubole/spark-acid but no success.
>>>>>
>>>>> Thanks
>>>>>
>>>>


Re: Unablee to get to_timestamp with Timezone Information

2020-04-28 Thread Chetan Khatri
Thanks Enrico, Magnus

On Thu, Apr 2, 2020 at 11:49 AM Enrico Minack 
wrote:

> Once parsed into a Timestamp the timestamp is store internally as UTC and
> printed as your local timezone (e.g. as defined by
> spark.sql.session.timeZone). Spark is good at hiding timezone information
> from you.
>
> You can get the timezone information via date_format(column, format):
>
> import org.apache.spark.sql.types.TimestampType
> import org.apache.spark.sql.functions._
>
> val sampleDF = Seq("2020-04-11T20:40:00-05:00").toDF("value")
> val timestampDF = sampleDF.select($"value".cast(TimestampType))
> timestampDF.select(date_format($"value",
> "-MM-dd'T'HH:mm:ss")).show(false)
> +-+
> |date_format(value, -MM-dd'T'HH:mm:ss)|
> +-+
> |2020-04-12T03:40:00+0200 |
> +-+
>
> If you want the timezone only, use timestampDF.select(date_format($"value",
> "")).show.
> ++
> |date_format(value, )|
> ++
> |   +0200|
> ++
>
> It all depends how you get the data "downstream". If you go through
> parquet or csv files, they will retain the timezone information. If you go
> through strings, you should format them as above. If you use Dataset.map
> you can access the timestamps as java.sql.Timestamp objects (but that might
> not be necessary):
>
> import java.sql.Timestamp
> case class Times(value: Timestamp)
> timestampDF.as[Times].map(t => t.value.getTimezoneOffset).show
> +-+
> |value|
> +-+
> | -120|
> +-+
>
>
> Enrico
>
>
> Am 31.03.20 um 21:40 schrieb Chetan Khatri:
>
> Sorry misrepresentation the question also. Thanks for your great help.
>
> What I want is the time zone information as it is
> 2020-04-11T20:40:00-05:00 in timestamp datatype. so I can write to
> downstream application as it is. I can correct the lacking UTC offset info.
>
>
> On Tue, Mar 31, 2020 at 1:15 PM Magnus Nilsson  wrote:
>
>> And to answer your question (sorry, read too fast). The string is not in
>> proper ISO8601. Extended form must be used throughout, ie
>> 2020-04-11T20:40:00-05:00, there's a colon (:) lacking in the UTC offset
>> info.
>>
>> br,
>>
>> Magnus
>>
>> On Tue, Mar 31, 2020 at 7:11 PM Magnus Nilsson  wrote:
>>
>>> Timestamps aren't timezoned. If you parse ISO8601 strings they will be
>>> converted to UTC automatically.
>>>
>>> If you parse timestamps without timezone they will converted to the the
>>> timezone the server Spark is running on uses. You can change the timezone
>>> Spark uses with spark.conf.set("spark.sql.session.timeZone", "UTC").
>>> Timestamps represent a point in time, the clock representation of that
>>> instant is dependent on sparks timezone settings both for parsing (non
>>> ISO8601) strings and showing timestamps.
>>>
>>> br,
>>>
>>> Magnus
>>>
>>> On Tue, Mar 31, 2020 at 6:14 PM Chetan Khatri <
>>> chetan.opensou...@gmail.com> wrote:
>>>
>>>> Hi Spark Users,
>>>>
>>>> I am losing the timezone value from below format, I tried couple of
>>>> formats but not able to make it. Can someone throw lights?
>>>>
>>>> scala> val sampleDF = Seq("2020-04-11T20:40:00-0500").toDF("value")
>>>> sampleDF: org.apache.spark.sql.DataFrame = [value: string]
>>>>
>>>> scala> sampleDF.select('value, to_timestamp('value,
>>>> "-MM-dd\'T\'HH:mm:ss")).show(false)
>>>>
>>>> +++
>>>> |value   |to_timestamp(`value`,
>>>> '-MM-dd\'T\'HH:mm:ss')|
>>>>
>>>> +++
>>>> |2020-04-11T20:40:00-0500|2020-04-11 20:40:00
>>>>   |
>>>>
>>>> +++
>>>>
>>>> Thanks
>>>>
>>>
>


How can I add extra mounted disk to HDFS

2020-04-28 Thread Chetan Khatri
Hi Spark Users,

My spark job gave me an error No Space left on the device


Re: Unablee to get to_timestamp with Timezone Information

2020-03-31 Thread Chetan Khatri
Sorry misrepresentation the question also. Thanks for your great help.

What I want is the time zone information as it is 2020-04-11T20:40:00-05:00
in timestamp datatype. so I can write to downstream application as it is. I
can correct the lacking UTC offset info.


On Tue, Mar 31, 2020 at 1:15 PM Magnus Nilsson  wrote:

> And to answer your question (sorry, read too fast). The string is not in
> proper ISO8601. Extended form must be used throughout, ie
> 2020-04-11T20:40:00-05:00, there's a colon (:) lacking in the UTC offset
> info.
>
> br,
>
> Magnus
>
> On Tue, Mar 31, 2020 at 7:11 PM Magnus Nilsson  wrote:
>
>> Timestamps aren't timezoned. If you parse ISO8601 strings they will be
>> converted to UTC automatically.
>>
>> If you parse timestamps without timezone they will converted to the the
>> timezone the server Spark is running on uses. You can change the timezone
>> Spark uses with spark.conf.set("spark.sql.session.timeZone", "UTC").
>> Timestamps represent a point in time, the clock representation of that
>> instant is dependent on sparks timezone settings both for parsing (non
>> ISO8601) strings and showing timestamps.
>>
>> br,
>>
>> Magnus
>>
>> On Tue, Mar 31, 2020 at 6:14 PM Chetan Khatri <
>> chetan.opensou...@gmail.com> wrote:
>>
>>> Hi Spark Users,
>>>
>>> I am losing the timezone value from below format, I tried couple of
>>> formats but not able to make it. Can someone throw lights?
>>>
>>> scala> val sampleDF = Seq("2020-04-11T20:40:00-0500").toDF("value")
>>> sampleDF: org.apache.spark.sql.DataFrame = [value: string]
>>>
>>> scala> sampleDF.select('value, to_timestamp('value,
>>> "-MM-dd\'T\'HH:mm:ss")).show(false)
>>>
>>> +++
>>> |value   |to_timestamp(`value`,
>>> '-MM-dd\'T\'HH:mm:ss')|
>>>
>>> +++
>>> |2020-04-11T20:40:00-0500|2020-04-11 20:40:00
>>>   |
>>>
>>> +++
>>>
>>> Thanks
>>>
>>


Unablee to get to_timestamp with Timezone Information

2020-03-31 Thread Chetan Khatri
Hi Spark Users,

I am losing the timezone value from below format, I tried couple of formats
but not able to make it. Can someone throw lights?

scala> val sampleDF = Seq("2020-04-11T20:40:00-0500").toDF("value")
sampleDF: org.apache.spark.sql.DataFrame = [value: string]

scala> sampleDF.select('value, to_timestamp('value,
"-MM-dd\'T\'HH:mm:ss")).show(false)
+++
|value   |to_timestamp(`value`, '-MM-dd\'T\'HH:mm:ss')|
+++
|2020-04-11T20:40:00-0500|2020-04-11 20:40:00 |
+++

Thanks


Re: Best Practice: Evaluate Expression from Spark DataFrame Column

2020-03-28 Thread Chetan Khatri
Is there a way to pass column as a String to expr function in spark?

val sampleDF = Seq(
  (8, 1, "bat", "NUM IS NOT NULL AND FLAG!=0"),
  (64, 0, "mouse", "NUM IS NOT NULL AND FLAG!=0"),
  (-27, 1, "horse" , "NUM IS NOT NULL AND FLAG!=0"),
  (1, 0, "miki", "NUM IS NOT NULL AND FLAG!=0 AND WORD == 'MIKI'")
).toDF("num", "flag", "word", "expression")

val derivedDF = sampleDF.withColumn("status",
expr(sampleDF.col("expression").as[String].toString()))


On Fri, Mar 27, 2020 at 10:35 PM Chetan Khatri 
wrote:

> Hi Spark Users,
>
> I want to evaluate expression from dataframe column values on other
> columns in the same dataframe for each row. Please suggest best approach to
> deal with this given that not impacting the performance of the job.
>
> Thanks
>
> Sample code:
>
> val sampleDF = Seq(
>   (8, 1, "bat", "NUM IS NOT NULL AND FLAG IS NOT 0"),
>   (64, 0, "mouse", "NUM IS NOT NULL AND FLAG IS NOT 0"),
>   (-27, 1, "horse" , "NUM IS NOT NULL AND FLAG IS NOT 0"),
>   (null, 0, "miki", "NUM IS NOT NULL AND FLAG IS NOT 1 AND WORD IS 'MIKI'")
> ).toDF("num", "flag", "word", "expression")
>
> val derivedDF = sampleDF.withColumn("status", sampleDF.col("expression"))
>
>


Best Practice: Evaluate Expression from Spark DataFrame Column

2020-03-27 Thread Chetan Khatri
Hi Spark Users,

I want to evaluate expression from dataframe column values on other columns
in the same dataframe for each row. Please suggest best approach to deal
with this given that not impacting the performance of the job.

Thanks

Sample code:

val sampleDF = Seq(
  (8, 1, "bat", "NUM IS NOT NULL AND FLAG IS NOT 0"),
  (64, 0, "mouse", "NUM IS NOT NULL AND FLAG IS NOT 0"),
  (-27, 1, "horse" , "NUM IS NOT NULL AND FLAG IS NOT 0"),
  (null, 0, "miki", "NUM IS NOT NULL AND FLAG IS NOT 1 AND WORD IS 'MIKI'")
).toDF("num", "flag", "word", "expression")

val derivedDF = sampleDF.withColumn("status", sampleDF.col("expression"))


Re: Read Hive ACID Managed table in Spark

2020-03-10 Thread Chetan Khatri
Hi Venkata,
Thanks for your reply. I am using HDP 2.6 and I don't think above will work
for me, Any other suggestions? Thanks

On Thu, Mar 5, 2020 at 8:24 AM venkata naidu udamala <
vudamala.gyan...@gmail.com> wrote:

> You can try using have warehouse connector
> https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.1.5/integrating-hive/content/hive_hivewarehouseconnector_for_handling_apache_spark_data.html
>
> On Thu, Mar 5, 2020, 6:51 AM Chetan Khatri 
> wrote:
>
>> Just followup, if anyone has worried on this before
>>
>> On Wed, Mar 4, 2020 at 12:09 PM Chetan Khatri <
>> chetan.opensou...@gmail.com> wrote:
>>
>>> Hi Spark Users,
>>> I want to read Hive ACID managed table data (ORC) in Spark. Can someone
>>> help me here.
>>> I've tried, https://github.com/qubole/spark-acid but no success.
>>>
>>> Thanks
>>>
>>


Re: Read Hive ACID Managed table in Spark

2020-03-05 Thread Chetan Khatri
Just followup, if anyone has worried on this before

On Wed, Mar 4, 2020 at 12:09 PM Chetan Khatri 
wrote:

> Hi Spark Users,
> I want to read Hive ACID managed table data (ORC) in Spark. Can someone
> help me here.
> I've tried, https://github.com/qubole/spark-acid but no success.
>
> Thanks
>


Read Hive ACID Managed table in Spark

2020-03-04 Thread Chetan Khatri
Hi Spark Users,
I want to read Hive ACID managed table data (ORC) in Spark. Can someone
help me here.
I've tried, https://github.com/qubole/spark-acid but no success.

Thanks


Re: Compute the Hash of each row in new column

2020-03-02 Thread Chetan Khatri
Thanks Enrico. I meant one hash of each single row in extra column
something like this.. val newDs = typedRows.withColumn("hash", hash(
typedRows.columns.map(col): _*))

On Mon, Mar 2, 2020 at 3:51 PM Enrico Minack  wrote:

> Well, then apply md5 on all columns:
>
> ds.select(ds.columns.map(col) ++ ds.columns.map(column =>
> md5(col(column)).as(s"$column hash")): _*).show(false)
>
> Enrico
>
> Am 02.03.20 um 11:10 schrieb Chetan Khatri:
>
> Thanks Enrico
> I want to compute hash of all the columns value in the row.
>
> On Fri, Feb 28, 2020 at 7:28 PM Enrico Minack 
> wrote:
>
>> This computes the md5 hash of a given column id of Dataset ds:
>>
>> ds.withColumn("id hash", md5($"id")).show(false)
>>
>> Test with this Dataset ds:
>>
>> import org.apache.spark.sql.types._
>> val ds = spark.range(10).select($"id".cast(StringType))
>>
>> Available are md5, sha, sha1, sha2 and hash:
>> https://spark.apache.org/docs/2.4.5/api/sql/index.html
>>
>> Enrico
>>
>>
>> Am 28.02.20 um 13:56 schrieb Chetan Khatri:
>> > Hi Spark Users,
>> > How can I compute Hash of each row and store in new column at
>> > Dataframe, could someone help me.
>> >
>> > Thanks
>>
>>
>>
>


Re: Compute the Hash of each row in new column

2020-03-02 Thread Chetan Khatri
Thanks Enrico
I want to compute hash of all the columns value in the row.

On Fri, Feb 28, 2020 at 7:28 PM Enrico Minack 
wrote:

> This computes the md5 hash of a given column id of Dataset ds:
>
> ds.withColumn("id hash", md5($"id")).show(false)
>
> Test with this Dataset ds:
>
> import org.apache.spark.sql.types._
> val ds = spark.range(10).select($"id".cast(StringType))
>
> Available are md5, sha, sha1, sha2 and hash:
> https://spark.apache.org/docs/2.4.5/api/sql/index.html
>
> Enrico
>
>
> Am 28.02.20 um 13:56 schrieb Chetan Khatri:
> > Hi Spark Users,
> > How can I compute Hash of each row and store in new column at
> > Dataframe, could someone help me.
> >
> > Thanks
>
>
>


Compute the Hash of each row in new column

2020-02-28 Thread Chetan Khatri
Hi Spark Users,
How can I compute Hash of each row and store in new column at Dataframe,
could someone help me.

Thanks


Apache Spark Use cases - my first talk

2019-12-25 Thread Chetan Khatri
Hi Spark Users,

Thank you for all support over the mailing list. Contributors - thanks for
your all contributions.
This is my first 5 mins talk with Apache Spark -
https://youtu.be/bBqItpgT8xQ

Thanks.


Re: How more than one spark job can write to same partition in the parquet file

2019-12-11 Thread Chetan Khatri
Thanks, If you can share alternative change in design. I would love to hear
from you.

On Wed, Dec 11, 2019 at 9:34 PM ayan guha  wrote:

> No we faced problem with that setup.
>
> On Thu, 12 Dec 2019 at 11:14 am, Chetan Khatri <
> chetan.opensou...@gmail.com> wrote:
>
>> Hi Spark Users,
>> would that be possible to write to same partition to the parquet file
>> through concurrent two spark jobs with different spark session.
>>
>> thanks
>>
> --
> Best Regards,
> Ayan Guha
>


How more than one spark job can write to same partition in the parquet file

2019-12-11 Thread Chetan Khatri
Hi Spark Users,
would that be possible to write to same partition to the parquet file
through concurrent two spark jobs with different spark session.

thanks


Re: Spark - configuration setting doesn't work

2019-10-29 Thread Chetan Khatri
Ok, thanks. I wanted to confirm that.

On Sun, Oct 27, 2019 at 12:55 PM hemant singh  wrote:

> You should add the configurations while creating the session, I don’t
> think you can override it once the session is created. Few are though.
>
> Thanks,
> Hemant
>
> On Sun, 27 Oct 2019 at 11:02 AM, Chetan Khatri <
> chetan.opensou...@gmail.com> wrote:
>
>> Could someone please help me.
>>
>> On Thu, Oct 17, 2019 at 7:29 PM Chetan Khatri <
>> chetan.opensou...@gmail.com> wrote:
>>
>>> Hi Users,
>>>
>>> I am setting spark configuration in below way;
>>>
>>> val spark = SparkSession.builder().appName(APP_NAME).getOrCreate()
>>>
>>> spark.conf.set("spark.speculation", "false")
>>> spark.conf.set("spark.broadcast.compress", "true")
>>> spark.conf.set("spark.sql.broadcastTimeout", "36000")
>>> spark.conf.set("spark.network.timeout", "2500s")
>>> spark.conf.set("spark.serializer", 
>>> "org.apache.spark.serializer.KryoSerializer")
>>> spark.conf.set("spark.driver.memory", "10g")
>>> spark.conf.set("spark.executor.memory", "10g")
>>>
>>> import spark.implicits._
>>>
>>>
>>> and submitting spark job with spark - submit. but none of the above 
>>> configuration is
>>>
>>> getting reflected to the job, I have checked at Spark-UI.
>>>
>>> I know setting up like this while creation of spark object, it's working 
>>> well.
>>>
>>>
>>> val spark = SparkSession.builder().appName(APP_NAME)
>>>   .config("spark.network.timeout", "1500s")
>>>   .config("spark.broadcast.compress", "true")
>>>   .config("spark.sql.broadcastTimeout", "36000")
>>>   .getOrCreate()
>>>
>>> import spark.implicits._
>>>
>>>
>>> Can someone please throw light?
>>>
>>>


Re: Spark Cluster over yarn cluster monitoring

2019-10-29 Thread Chetan Khatri
Thanks Jörn

On Sun, Oct 27, 2019 at 8:01 AM Jörn Franke  wrote:

> Use yarn queues:
>
>
> https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/FairScheduler.html
>
> Am 27.10.2019 um 06:41 schrieb Chetan Khatri  >:
>
> 
> Could someone please help me to understand better..
>
> On Thu, Oct 17, 2019 at 7:41 PM Chetan Khatri 
> wrote:
>
>> Hi Users,
>>
>> I do submit *X* number of jobs with Airflow to Yarn as a part of
>> workflow for *Y *customer. I could potentially run workflow for customer *Z
>> *but I need to check that how much resources are available over the
>> cluster so jobs for next customer should start.
>>
>> Could you please tell what is the best way to handle this. Currently, I
>> am just checking availableMB > 100 then trigger next Airflow DAG over Yarn.
>>
>> GET http://rm-http-address:port/ws/v1/cluster/metrics
>>
>> Thanks.
>>
>>


Re: Spark Cluster over yarn cluster monitoring

2019-10-26 Thread Chetan Khatri
Could someone please help me to understand better..

On Thu, Oct 17, 2019 at 7:41 PM Chetan Khatri 
wrote:

> Hi Users,
>
> I do submit *X* number of jobs with Airflow to Yarn as a part of workflow
> for *Y *customer. I could potentially run workflow for customer *Z *but I
> need to check that how much resources are available over the cluster so
> jobs for next customer should start.
>
> Could you please tell what is the best way to handle this. Currently, I am
> just checking availableMB > 100 then trigger next Airflow DAG over Yarn.
>
> GET http://rm-http-address:port/ws/v1/cluster/metrics
>
> Thanks.
>
>


Re: Spark - configuration setting doesn't work

2019-10-26 Thread Chetan Khatri
Could someone please help me.

On Thu, Oct 17, 2019 at 7:29 PM Chetan Khatri 
wrote:

> Hi Users,
>
> I am setting spark configuration in below way;
>
> val spark = SparkSession.builder().appName(APP_NAME).getOrCreate()
>
> spark.conf.set("spark.speculation", "false")
> spark.conf.set("spark.broadcast.compress", "true")
> spark.conf.set("spark.sql.broadcastTimeout", "36000")
> spark.conf.set("spark.network.timeout", "2500s")
> spark.conf.set("spark.serializer", 
> "org.apache.spark.serializer.KryoSerializer")
> spark.conf.set("spark.driver.memory", "10g")
> spark.conf.set("spark.executor.memory", "10g")
>
> import spark.implicits._
>
>
> and submitting spark job with spark - submit. but none of the above 
> configuration is
>
> getting reflected to the job, I have checked at Spark-UI.
>
> I know setting up like this while creation of spark object, it's working well.
>
>
> val spark = SparkSession.builder().appName(APP_NAME)
>   .config("spark.network.timeout", "1500s")
>   .config("spark.broadcast.compress", "true")
>   .config("spark.sql.broadcastTimeout", "36000")
>   .getOrCreate()
>
> import spark.implicits._
>
>
> Can someone please throw light?
>
>


Spark Cluster over yarn cluster monitoring

2019-10-17 Thread Chetan Khatri
Hi Users,

I do submit *X* number of jobs with Airflow to Yarn as a part of workflow
for *Y *customer. I could potentially run workflow for customer *Z *but I
need to check that how much resources are available over the cluster so
jobs for next customer should start.

Could you please tell what is the best way to handle this. Currently, I am
just checking availableMB > 100 then trigger next Airflow DAG over Yarn.

GET http://rm-http-address:port/ws/v1/cluster/metrics

Thanks.


Re: Control Sqoop job from Spark job

2019-10-17 Thread Chetan Khatri
Shyam, As mark said - if we boost the parallelism with  spark we can reach
to performance of sqoop or better than that.

On Tue, Sep 3, 2019 at 6:35 PM Shyam P  wrote:

> J Franke,
>  Leave alone sqoop , I am just asking about spark in ETL of Oracle ...?
>
> Thanks,
> Shyam
>
>>


Spark - configuration setting doesn't work

2019-10-17 Thread Chetan Khatri
Hi Users,

I am setting spark configuration in below way;

val spark = SparkSession.builder().appName(APP_NAME).getOrCreate()

spark.conf.set("spark.speculation", "false")
spark.conf.set("spark.broadcast.compress", "true")
spark.conf.set("spark.sql.broadcastTimeout", "36000")
spark.conf.set("spark.network.timeout", "2500s")
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
spark.conf.set("spark.driver.memory", "10g")
spark.conf.set("spark.executor.memory", "10g")

import spark.implicits._


and submitting spark job with spark - submit. but none of the above
configuration is

getting reflected to the job, I have checked at Spark-UI.

I know setting up like this while creation of spark object, it's working well.


val spark = SparkSession.builder().appName(APP_NAME)
  .config("spark.network.timeout", "1500s")
  .config("spark.broadcast.compress", "true")
  .config("spark.sql.broadcastTimeout", "36000")
  .getOrCreate()

import spark.implicits._


Can someone please throw light?


Re: Control Sqoop job from Spark job

2019-09-02 Thread Chetan Khatri
Hi Mich, JDBC Connection which is similar to Sqoop takes time and could not
do parallelism.

On Sat, Aug 31, 2019 at 12:17 PM Mich Talebzadeh 
wrote:

> Spark is an excellent ETL tool to lift data from source and put it in
> target. Spark uses JDBC connection similar to Sqoop. I don't see the need
> for Sqoop with Spark here.
>
> Where is the source (Oracle MSSQL, etc) and target (Hive?) here
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 29 Aug 2019 at 21:01, Chetan Khatri 
> wrote:
>
>> Hi Users,
>> I am launching a Sqoop job from Spark job and would like to FAIL Spark
>> job if Sqoop job fails.
>>
>> def executeSqoopOriginal(serverName: String, schemaName: String, username: 
>> String, password: String,
>>  query: String, splitBy: String, fetchSize: Int, numMappers: 
>> Int, targetDir: String, jobName: String, dateColumns: String) = {
>>
>>   val connectionString = "jdbc:sqlserver://" + serverName + ";" + 
>> "databaseName=" + schemaName
>>   var parameters = Array("import")
>>   parameters = parameters :+ "-Dmapreduce.job.user.classpath.first=true"
>>   parameters = parameters :+ "--connect"
>>   parameters = parameters :+ connectionString
>>   parameters = parameters :+ "--mapreduce-job-name"
>>   parameters = parameters :+ jobName
>>   parameters = parameters :+ "--username"
>>   parameters = parameters :+ username
>>   parameters = parameters :+ "--password"
>>   parameters = parameters :+ password
>>   parameters = parameters :+ "--hadoop-mapred-home"
>>   parameters = parameters :+ "/usr/hdp/2.6.5.0-292/hadoop-mapreduce/"
>>   parameters = parameters :+ "--hadoop-home"
>>   parameters = parameters :+ "/usr/hdp/2.6.5.0-292/hadoop/"
>>   parameters = parameters :+ "--query"
>>   parameters = parameters :+ query
>>   parameters = parameters :+ "--split-by"
>>   parameters = parameters :+ splitBy
>>   parameters = parameters :+ "--fetch-size"
>>   parameters = parameters :+ fetchSize.toString
>>   parameters = parameters :+ "--num-mappers"
>>   parameters = parameters :+ numMappers.toString
>>   if (dateColumns.length() > 0) {
>> parameters = parameters :+ "--map-column-java"
>> parameters = parameters :+ dateColumns
>>   }
>>   parameters = parameters :+ "--target-dir"
>>   parameters = parameters :+ targetDir
>>   parameters = parameters :+ "--delete-target-dir"
>>   parameters = parameters :+ "--as-avrodatafile"
>>
>> }
>>
>>


Re: Control Sqoop job from Spark job

2019-09-02 Thread Chetan Khatri
Hi Chris, Thanks for the email. You're right. but it's like Sqoop job gets
launched based on dataframe values in spark job. Certainly it can be
isolated and broken.

On Sat, Aug 31, 2019 at 8:07 AM Chris Teoh  wrote:

> I'd say this is an uncommon approach, could you use a workflow/scheduling
> system to call Sqoop outside of Spark? Spark is usually multiprocess
> distributed so putting in this Sqoop job in the Spark code seems to imply
> you want to run Sqoop first, then Spark. If you're really insistent on
> this, call it from the driver using Sqoop Java APIs.
>
> On Fri, 30 Aug 2019 at 06:02, Chetan Khatri 
> wrote:
>
>> Sorry,
>> I call sqoop job from above function. Can you help me to resolve this.
>>
>> Thanks
>>
>> On Fri, Aug 30, 2019 at 1:31 AM Chetan Khatri <
>> chetan.opensou...@gmail.com> wrote:
>>
>>> Hi Users,
>>> I am launching a Sqoop job from Spark job and would like to FAIL Spark
>>> job if Sqoop job fails.
>>>
>>> def executeSqoopOriginal(serverName: String, schemaName: String, username: 
>>> String, password: String,
>>>  query: String, splitBy: String, fetchSize: Int, 
>>> numMappers: Int, targetDir: String, jobName: String, dateColumns: String) = 
>>> {
>>>
>>>   val connectionString = "jdbc:sqlserver://" + serverName + ";" + 
>>> "databaseName=" + schemaName
>>>   var parameters = Array("import")
>>>   parameters = parameters :+ "-Dmapreduce.job.user.classpath.first=true"
>>>   parameters = parameters :+ "--connect"
>>>   parameters = parameters :+ connectionString
>>>   parameters = parameters :+ "--mapreduce-job-name"
>>>   parameters = parameters :+ jobName
>>>   parameters = parameters :+ "--username"
>>>   parameters = parameters :+ username
>>>   parameters = parameters :+ "--password"
>>>   parameters = parameters :+ password
>>>   parameters = parameters :+ "--hadoop-mapred-home"
>>>   parameters = parameters :+ "/usr/hdp/2.6.5.0-292/hadoop-mapreduce/"
>>>   parameters = parameters :+ "--hadoop-home"
>>>   parameters = parameters :+ "/usr/hdp/2.6.5.0-292/hadoop/"
>>>   parameters = parameters :+ "--query"
>>>   parameters = parameters :+ query
>>>   parameters = parameters :+ "--split-by"
>>>   parameters = parameters :+ splitBy
>>>   parameters = parameters :+ "--fetch-size"
>>>   parameters = parameters :+ fetchSize.toString
>>>   parameters = parameters :+ "--num-mappers"
>>>   parameters = parameters :+ numMappers.toString
>>>   if (dateColumns.length() > 0) {
>>> parameters = parameters :+ "--map-column-java"
>>> parameters = parameters :+ dateColumns
>>>   }
>>>   parameters = parameters :+ "--target-dir"
>>>   parameters = parameters :+ targetDir
>>>   parameters = parameters :+ "--delete-target-dir"
>>>   parameters = parameters :+ "--as-avrodatafile"
>>>
>>> }
>>>
>>>
>
> --
> Chris
>


Re: Control Sqoop job from Spark job

2019-08-29 Thread Chetan Khatri
Sorry,
I call sqoop job from above function. Can you help me to resolve this.

Thanks

On Fri, Aug 30, 2019 at 1:31 AM Chetan Khatri 
wrote:

> Hi Users,
> I am launching a Sqoop job from Spark job and would like to FAIL Spark job
> if Sqoop job fails.
>
> def executeSqoopOriginal(serverName: String, schemaName: String, username: 
> String, password: String,
>  query: String, splitBy: String, fetchSize: Int, numMappers: 
> Int, targetDir: String, jobName: String, dateColumns: String) = {
>
>   val connectionString = "jdbc:sqlserver://" + serverName + ";" + 
> "databaseName=" + schemaName
>   var parameters = Array("import")
>   parameters = parameters :+ "-Dmapreduce.job.user.classpath.first=true"
>   parameters = parameters :+ "--connect"
>   parameters = parameters :+ connectionString
>   parameters = parameters :+ "--mapreduce-job-name"
>   parameters = parameters :+ jobName
>   parameters = parameters :+ "--username"
>   parameters = parameters :+ username
>   parameters = parameters :+ "--password"
>   parameters = parameters :+ password
>   parameters = parameters :+ "--hadoop-mapred-home"
>   parameters = parameters :+ "/usr/hdp/2.6.5.0-292/hadoop-mapreduce/"
>   parameters = parameters :+ "--hadoop-home"
>   parameters = parameters :+ "/usr/hdp/2.6.5.0-292/hadoop/"
>   parameters = parameters :+ "--query"
>   parameters = parameters :+ query
>   parameters = parameters :+ "--split-by"
>   parameters = parameters :+ splitBy
>   parameters = parameters :+ "--fetch-size"
>   parameters = parameters :+ fetchSize.toString
>   parameters = parameters :+ "--num-mappers"
>   parameters = parameters :+ numMappers.toString
>   if (dateColumns.length() > 0) {
> parameters = parameters :+ "--map-column-java"
> parameters = parameters :+ dateColumns
>   }
>   parameters = parameters :+ "--target-dir"
>   parameters = parameters :+ targetDir
>   parameters = parameters :+ "--delete-target-dir"
>   parameters = parameters :+ "--as-avrodatafile"
>
> }
>
>


Control Sqoop job from Spark job

2019-08-29 Thread Chetan Khatri
Hi Users,
I am launching a Sqoop job from Spark job and would like to FAIL Spark job
if Sqoop job fails.

def executeSqoopOriginal(serverName: String, schemaName: String,
username: String, password: String,
 query: String, splitBy: String, fetchSize: Int,
numMappers: Int, targetDir: String, jobName: String, dateColumns:
String) = {

  val connectionString = "jdbc:sqlserver://" + serverName + ";" +
"databaseName=" + schemaName
  var parameters = Array("import")
  parameters = parameters :+ "-Dmapreduce.job.user.classpath.first=true"
  parameters = parameters :+ "--connect"
  parameters = parameters :+ connectionString
  parameters = parameters :+ "--mapreduce-job-name"
  parameters = parameters :+ jobName
  parameters = parameters :+ "--username"
  parameters = parameters :+ username
  parameters = parameters :+ "--password"
  parameters = parameters :+ password
  parameters = parameters :+ "--hadoop-mapred-home"
  parameters = parameters :+ "/usr/hdp/2.6.5.0-292/hadoop-mapreduce/"
  parameters = parameters :+ "--hadoop-home"
  parameters = parameters :+ "/usr/hdp/2.6.5.0-292/hadoop/"
  parameters = parameters :+ "--query"
  parameters = parameters :+ query
  parameters = parameters :+ "--split-by"
  parameters = parameters :+ splitBy
  parameters = parameters :+ "--fetch-size"
  parameters = parameters :+ fetchSize.toString
  parameters = parameters :+ "--num-mappers"
  parameters = parameters :+ numMappers.toString
  if (dateColumns.length() > 0) {
parameters = parameters :+ "--map-column-java"
parameters = parameters :+ dateColumns
  }
  parameters = parameters :+ "--target-dir"
  parameters = parameters :+ targetDir
  parameters = parameters :+ "--delete-target-dir"
  parameters = parameters :+ "--as-avrodatafile"

}


Re: Kafka Topic to Parquet HDFS with Structured Streaming

2019-06-10 Thread Chetan Khatri
Hello Deng, Thank you for your email.
Issue was with Spark - Hadoop / HDFS configuration settings.

Thanks

On Mon, Jun 10, 2019 at 5:28 AM Deng Ching-Mallete 
wrote:

> Hi Chetan,
>
> Best to check if the user account that you're using to run the job has
> permission to write to the path in HDFS. I would suggest to write the
> parquet files to a different path, perhaps to a project space or user home,
> rather than at the root directory.
>
> HTH,
> Deng
>
> On Sat, Jun 8, 2019 at 8:00 AM Chetan Khatri 
> wrote:
>
>> Hello Dear Spark Users,
>>
>> I am trying to write data from Kafka Topic to Parquet HDFS with
>> Structured Streaming but Getting failures. Please do help.
>>
>> val spark: SparkSession = 
>> SparkSession.builder().appName("DemoSparkKafka").getOrCreate()
>> import spark.implicits._
>> val dataFromTopicDF = spark
>>   .readStream
>>   .format("kafka")
>>   .option("kafka.bootstrap.servers", "localhost:9092")
>>   .option("subscribe", "test")
>>   .option("startingOffsets", "earliest")
>>   .load()
>>   .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
>>
>> logger.info("DemoSparkKafka - Printing Topic Messages in Key - Value pairs.")
>> val topicQuery = dataFromTopicDF.writeStream
>>   .format("console")
>>   .option("truncate", false)
>>   .option("checkpointLocation", "/tmp/checkpoint")
>>   .trigger(Trigger.ProcessingTime(10.seconds))
>>   .start()
>>
>> topicQuery.awaitTermination()
>> topicQuery.stop()
>>
>>
>> Above code is working well but when I am trying to write to Parquet at HDFS 
>> getting exceptions.
>>
>>
>> logger.info("DemoSparkKafka - Writing Topic Messages to Parquet at HDFS")
>>
>> val parquetQuery = dataFromTopicDF.writeStream
>> .format("parquet")
>> .option("startingOffsets", "earliest")
>> .option("checkpointLocation", "/tmp/checkpoint")
>> .option("path", "/sample-topic")
>> .start()
>>
>> parquetQuery.awaitTermination()
>> parquetQuery.stop()
>>
>>
>> *Exception Details:*
>>
>>
>> Exception in thread "main" java.io.IOException: mkdir of 
>> /sample-topic/_spark_metadata failed
>>  at org.apache.hadoop.fs.FileSystem.primitiveMkdir(FileSystem.java:1067)
>>  at 
>> org.apache.hadoop.fs.DelegateToFileSystem.mkdir(DelegateToFileSystem.java:176)
>>  at org.apache.hadoop.fs.FilterFs.mkdir(FilterFs.java:197)
>>  at org.apache.hadoop.fs.FileContext$4.next(FileContext.java:730)
>>  at org.apache.hadoop.fs.FileContext$4.next(FileContext.java:726)
>>  at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
>>  at org.apache.hadoop.fs.FileContext.mkdir(FileContext.java:733)
>>  at 
>> org.apache.spark.sql.execution.streaming.HDFSMetadataLog$FileContextManager.mkdirs(HDFSMetadataLog.scala:378)
>>  at 
>> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.(HDFSMetadataLog.scala:66)
>>  at 
>> org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.(CompactibleFileStreamLog.scala:46)
>>  at 
>> org.apache.spark.sql.execution.streaming.FileStreamSinkLog.(FileStreamSinkLog.scala:85)
>>  at 
>> org.apache.spark.sql.execution.streaming.FileStreamSink.(FileStreamSink.scala:98)
>>  at 
>> org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:317)
>>  at 
>> org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:293)
>>  at 
>> com.dynasty.poc.DemoSparkKafka$.delayedEndpoint$com$dynasty$poc$DemoSparkKafka$1(DemoSparkKafka.scala:35)
>>  at 
>> com.dynasty.poc.DemoSparkKafka$delayedInit$body.apply(DemoSparkKafka.scala:7)
>>  at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
>>  at 
>> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
>>  at scala.App$$anonfun$main$1.apply(App.scala:76)
>>  at scala.App$$anonfun$main$1.apply(App.scala:76)
>>  at scala.collection.immutable.List.foreach(List.scala:381)
>>  at 
>> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
>>  at scala.App$class.main(App.scala:76)
>>  at com.dynasty.poc.DemoSparkKafka$.main(DemoSparkKafka.scala:7)
>>  at com.dynasty.poc.DemoSparkKa

Re: Kafka Topic to Parquet HDFS with Structured Streaming

2019-06-07 Thread Chetan Khatri
Also anyone has any idea to resolve this issue -
https://stackoverflow.com/questions/56390492/spark-metadata-0-doesnt-exist-while-compacting-batch-9-structured-streaming-er

On Fri, Jun 7, 2019 at 5:59 PM Chetan Khatri 
wrote:

> Hello Dear Spark Users,
>
> I am trying to write data from Kafka Topic to Parquet HDFS with Structured
> Streaming but Getting failures. Please do help.
>
> val spark: SparkSession = 
> SparkSession.builder().appName("DemoSparkKafka").getOrCreate()
> import spark.implicits._
> val dataFromTopicDF = spark
>   .readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "localhost:9092")
>   .option("subscribe", "test")
>   .option("startingOffsets", "earliest")
>   .load()
>   .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
>
> logger.info("DemoSparkKafka - Printing Topic Messages in Key - Value pairs.")
> val topicQuery = dataFromTopicDF.writeStream
>   .format("console")
>   .option("truncate", false)
>   .option("checkpointLocation", "/tmp/checkpoint")
>   .trigger(Trigger.ProcessingTime(10.seconds))
>   .start()
>
> topicQuery.awaitTermination()
> topicQuery.stop()
>
>
> Above code is working well but when I am trying to write to Parquet at HDFS 
> getting exceptions.
>
>
> logger.info("DemoSparkKafka - Writing Topic Messages to Parquet at HDFS")
>
> val parquetQuery = dataFromTopicDF.writeStream
> .format("parquet")
> .option("startingOffsets", "earliest")
> .option("checkpointLocation", "/tmp/checkpoint")
> .option("path", "/sample-topic")
> .start()
>
> parquetQuery.awaitTermination()
> parquetQuery.stop()
>
>
> *Exception Details:*
>
>
> Exception in thread "main" java.io.IOException: mkdir of 
> /sample-topic/_spark_metadata failed
>   at org.apache.hadoop.fs.FileSystem.primitiveMkdir(FileSystem.java:1067)
>   at 
> org.apache.hadoop.fs.DelegateToFileSystem.mkdir(DelegateToFileSystem.java:176)
>   at org.apache.hadoop.fs.FilterFs.mkdir(FilterFs.java:197)
>   at org.apache.hadoop.fs.FileContext$4.next(FileContext.java:730)
>   at org.apache.hadoop.fs.FileContext$4.next(FileContext.java:726)
>   at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
>   at org.apache.hadoop.fs.FileContext.mkdir(FileContext.java:733)
>   at 
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog$FileContextManager.mkdirs(HDFSMetadataLog.scala:378)
>   at 
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.(HDFSMetadataLog.scala:66)
>   at 
> org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.(CompactibleFileStreamLog.scala:46)
>   at 
> org.apache.spark.sql.execution.streaming.FileStreamSinkLog.(FileStreamSinkLog.scala:85)
>   at 
> org.apache.spark.sql.execution.streaming.FileStreamSink.(FileStreamSink.scala:98)
>   at 
> org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:317)
>   at 
> org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:293)
>   at 
> com.dynasty.poc.DemoSparkKafka$.delayedEndpoint$com$dynasty$poc$DemoSparkKafka$1(DemoSparkKafka.scala:35)
>   at 
> com.dynasty.poc.DemoSparkKafka$delayedInit$body.apply(DemoSparkKafka.scala:7)
>   at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
>   at 
> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
>   at scala.App$$anonfun$main$1.apply(App.scala:76)
>   at scala.App$$anonfun$main$1.apply(App.scala:76)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at 
> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
>   at scala.App$class.main(App.scala:76)
>   at com.dynasty.poc.DemoSparkKafka$.main(DemoSparkKafka.scala:7)
>   at com.dynasty.poc.DemoSparkKafka.main(DemoSparkKafka.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
>   at 
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)
>   at 
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
>   at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
>   at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
>   at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
>
> Thanks
>
>


Kafka Topic to Parquet HDFS with Structured Streaming

2019-06-07 Thread Chetan Khatri
Hello Dear Spark Users,

I am trying to write data from Kafka Topic to Parquet HDFS with Structured
Streaming but Getting failures. Please do help.

val spark: SparkSession =
SparkSession.builder().appName("DemoSparkKafka").getOrCreate()
import spark.implicits._
val dataFromTopicDF = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "test")
  .option("startingOffsets", "earliest")
  .load()
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

logger.info("DemoSparkKafka - Printing Topic Messages in Key - Value pairs.")
val topicQuery = dataFromTopicDF.writeStream
  .format("console")
  .option("truncate", false)
  .option("checkpointLocation", "/tmp/checkpoint")
  .trigger(Trigger.ProcessingTime(10.seconds))
  .start()

topicQuery.awaitTermination()
topicQuery.stop()


Above code is working well but when I am trying to write to Parquet at
HDFS getting exceptions.


logger.info("DemoSparkKafka - Writing Topic Messages to Parquet at HDFS")

val parquetQuery = dataFromTopicDF.writeStream
.format("parquet")
.option("startingOffsets", "earliest")
.option("checkpointLocation", "/tmp/checkpoint")
.option("path", "/sample-topic")
.start()

parquetQuery.awaitTermination()
parquetQuery.stop()


*Exception Details:*


Exception in thread "main" java.io.IOException: mkdir of
/sample-topic/_spark_metadata failed
at org.apache.hadoop.fs.FileSystem.primitiveMkdir(FileSystem.java:1067)
at 
org.apache.hadoop.fs.DelegateToFileSystem.mkdir(DelegateToFileSystem.java:176)
at org.apache.hadoop.fs.FilterFs.mkdir(FilterFs.java:197)
at org.apache.hadoop.fs.FileContext$4.next(FileContext.java:730)
at org.apache.hadoop.fs.FileContext$4.next(FileContext.java:726)
at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
at org.apache.hadoop.fs.FileContext.mkdir(FileContext.java:733)
at 
org.apache.spark.sql.execution.streaming.HDFSMetadataLog$FileContextManager.mkdirs(HDFSMetadataLog.scala:378)
at 
org.apache.spark.sql.execution.streaming.HDFSMetadataLog.(HDFSMetadataLog.scala:66)
at 
org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.(CompactibleFileStreamLog.scala:46)
at 
org.apache.spark.sql.execution.streaming.FileStreamSinkLog.(FileStreamSinkLog.scala:85)
at 
org.apache.spark.sql.execution.streaming.FileStreamSink.(FileStreamSink.scala:98)
at 
org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:317)
at 
org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:293)
at 
com.dynasty.poc.DemoSparkKafka$.delayedEndpoint$com$dynasty$poc$DemoSparkKafka$1(DemoSparkKafka.scala:35)
at 
com.dynasty.poc.DemoSparkKafka$delayedInit$body.apply(DemoSparkKafka.scala:7)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at 
scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:381)
at 
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at com.dynasty.poc.DemoSparkKafka$.main(DemoSparkKafka.scala:7)
at com.dynasty.poc.DemoSparkKafka.main(DemoSparkKafka.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)
at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


Thanks


Re: Update / Delete records in Parquet

2019-05-03 Thread Chetan Khatri
Agreed with delta.io, I am exploring both options

On Wed, May 1, 2019 at 2:50 PM Vitaliy Pisarev 
wrote:

> Ankit, you should take a look at delta.io that was recently open sourced
> by databricks.
>
> Full DML support is on the way.
>
>
>
> *From: *"Khare, Ankit" 
> *Date: *Tuesday, 23 April 2019 at 11:35
> *To: *Chetan Khatri , Jason Nerothin <
> jasonnerot...@gmail.com>
> *Cc: *user 
> *Subject: *Re: Update / Delete records in Parquet
>
>
>
> Hi Chetan,
>
>
>
> I also agree that for this usecase parquet would not be the best option .
> I had similar usecase ,
>
>
>
> 50 different tables to be download from MSSQL .
>
>
>
> Source : MSSQL
>
> Destination. : Apache KUDU (Since it supports very well change data
> capture use cases)
>
>
>
> We used Streamset CDC module to connect to MSSQL and then get CDC data to
> Apache KUDU
>
>
>
> Total records. : 3 B
>
>
>
> Thanks
>
> Ankit
>
>
>
>
>
> *From: *Chetan Khatri 
> *Date: *Tuesday, 23. April 2019 at 05:58
> *To: *Jason Nerothin 
> *Cc: *user 
> *Subject: *Re: Update / Delete records in Parquet
>
>
>
> Hello Jason, Thank you for reply. My use case is that, first time I do
> full load and transformation/aggregation/joins and write to parquet (as
> staging) but next time onwards my source is MSSQL Server, I want to pull
> only those records got changed / updated and would like to update at
> parquet also if possible without side effects.
>
>
> https://docs.microsoft.com/en-us/sql/relational-databases/track-changes/work-with-change-tracking-sql-server?view=sql-server-2017
>
>
>
> On Tue, Apr 23, 2019 at 3:02 AM Jason Nerothin 
> wrote:
>
> Hi Chetan,
>
>
>
> Do you have to use Parquet?
>
>
>
> It just feels like it might be the wrong sink for a high-frequency change
> scenario.
>
>
>
> What are you trying to accomplish?
>
>
>
> Thanks,
> Jason
>
>
>
> On Mon, Apr 22, 2019 at 2:09 PM Chetan Khatri 
> wrote:
>
> Hello All,
>
>
>
> If I am doing incremental load / delta and would like to update / delete
> the records in parquet, I understands that parquet is immutable and can't
> be deleted / updated theoretically only append / overwrite can be done. But
> I can see utility tools which claims to add value for that.
>
>
>
> https://github.com/Factual/parquet-rewriter
>
>
>
> Please throw a light.
>
>
>
> Thanks
>
>
>
>
> --
>
> Thanks,
>
> Jason
>
>


Re: Update / Delete records in Parquet

2019-04-22 Thread Chetan Khatri
Hello Jason, Thank you for reply. My use case is that, first time I do full
load and transformation/aggregation/joins and write to parquet (as staging)
but next time onwards my source is MSSQL Server, I want to pull only those
records got changed / updated and would like to update at parquet also if
possible without side effects.
https://docs.microsoft.com/en-us/sql/relational-databases/track-changes/work-with-change-tracking-sql-server?view=sql-server-2017

On Tue, Apr 23, 2019 at 3:02 AM Jason Nerothin 
wrote:

> Hi Chetan,
>
> Do you have to use Parquet?
>
> It just feels like it might be the wrong sink for a high-frequency change
> scenario.
>
> What are you trying to accomplish?
>
> Thanks,
> Jason
>
> On Mon, Apr 22, 2019 at 2:09 PM Chetan Khatri 
> wrote:
>
>> Hello All,
>>
>> If I am doing incremental load / delta and would like to update / delete
>> the records in parquet, I understands that parquet is immutable and can't
>> be deleted / updated theoretically only append / overwrite can be done. But
>> I can see utility tools which claims to add value for that.
>>
>> https://github.com/Factual/parquet-rewriter
>>
>> Please throw a light.
>>
>> Thanks
>>
>
>
> --
> Thanks,
> Jason
>


Update / Delete records in Parquet

2019-04-22 Thread Chetan Khatri
Hello All,

If I am doing incremental load / delta and would like to update / delete
the records in parquet, I understands that parquet is immutable and can't
be deleted / updated theoretically only append / overwrite can be done. But
I can see utility tools which claims to add value for that.

https://github.com/Factual/parquet-rewriter

Please throw a light.

Thanks


Usage of Explicit Future in Spark program

2019-04-21 Thread Chetan Khatri
Hello Spark Users,

Someone has suggested by breaking 5-5 unpredictable transformation blocks
to Future[ONE STRING ARGUMENT] and claim this can tune the performance. I
am wondering this is a use of explicit Future! in Spark?

Sample code is below:

 def writeData( tableName: String): Future[String] =  Future {

// some heavy lifting Spark transformations, 5-6 read->transform->load.

}

writeDataFutures += writeData("dynamicFieldData")

 writeDataFutures foreach { writeDataFuture =>
  Await.ready(writeDataFuture, Duration.Inf).onComplete {
case Success(table) => logger.info(s"All Success")
case Failure(e) => e.printStackTrace()
  }
}


Please suggest technical doubt.

Thanks


Re: How to print DataFrame.show(100) to text file at HDFS

2019-04-14 Thread Chetan Khatri
Nuthan,

Thank you for reply. the solution proposed will give everything. for me is
like one Dataframe show(100) in 3000 lines of Scala Spark code.
However, yarn logs --applicationId  > 1.log also gives all
stdout and stderr.

Thanks

On Sun, Apr 14, 2019 at 10:30 AM Nuthan Reddy 
wrote:

> Hi Chetan,
>
> You can use
>
> spark-submit showDF.py | hadoop fs -put - showDF.txt
>
> showDF.py:
>
> from pyspark.sql import SparkSession
>
>
> spark = SparkSession.builder.appName("Write stdout").getOrCreate()
>
> spark.sparkContext.setLogLevel("OFF")
>
>
> spark.table("").show(100,truncate=false)
>
> But is there any specific reason you want to write it to hdfs? Is this for
> human consumption?
>
> Regards,
> Nuthan
>
> On Sat, Apr 13, 2019 at 6:41 PM Chetan Khatri 
> wrote:
>
>> Hello Users,
>>
>> In spark when I have a DataFrame and do  .show(100) the output which gets
>> printed, I wants to save as it is content to txt file in HDFS.
>>
>> How can I do this?
>>
>> Thanks
>>
>
>
> --
> Nuthan Reddy
> Sigmoid Analytics
>
>
> *Disclaimer*: This is not a mass e-mail and my intention here is purely
> from a business perspective, and not to spam or encroach your privacy. I am
> writing with a specific agenda to build a personal business connection.
> Being a reputed and genuine organization, Sigmoid respects the digital
> security of every prospect and tries to comply with GDPR and other regional
> laws. Please let us know if you feel otherwise and we will rectify the
> misunderstanding and adhere to comply in the future. In case we have missed
> any of the compliance, it is completely unintentional.
>


How to print DataFrame.show(100) to text file at HDFS

2019-04-13 Thread Chetan Khatri
Hello Users,

In spark when I have a DataFrame and do  .show(100) the output which gets
printed, I wants to save as it is content to txt file in HDFS.

How can I do this?

Thanks


Re: dropDuplicate on timestamp based column unexpected output

2019-04-04 Thread Chetan Khatri
Abdeali, Jason:

while submitting spark job num-executors 8, num-cores 8, driver-memory 14g
and executor-memory 14g, the size of total data was processed were 5 GB
with 100+ aggregation and 50+ different joins at various data frame level.

So it is really hard to tell specific number of partitions. But I have not
done repartition / coalesce so default 200 would be used, I guess.

I read , a long time ago that Window function is killer. So I wanted to
clarify my doubt.

Thanks



On Thu, Apr 4, 2019 at 10:43 PM Jason Nerothin 
wrote:

> My thinking is that if you run everything in one partition - say 12 GB -
> then you don't experience the partitioning problem - one partition will
> have all duplicates.
>
> If that's not the case, there are other options, but would probably
> require a design change.
>
> On Thu, Apr 4, 2019 at 8:46 AM Jason Nerothin 
> wrote:
>
>> How much memory do you have per partition?
>>
>> On Thu, Apr 4, 2019 at 7:49 AM Chetan Khatri 
>> wrote:
>>
>>> I will get the information and will share with you.
>>>
>>> On Thu, Apr 4, 2019 at 5:03 PM Abdeali Kothari 
>>> wrote:
>>>
>>>> How long does it take to do the window solution ? (Also mention how
>>>> many executors was your spark application using on average during that 
>>>> time)
>>>> I am not aware of anything that is faster. When I ran is on my data
>>>> ~8-9GB I think it took less than 5 mins (don't remember exact time)
>>>>
>>>> On Thu, Apr 4, 2019 at 1:09 PM Chetan Khatri <
>>>> chetan.opensou...@gmail.com> wrote:
>>>>
>>>>> Thanks for awesome clarification / explanation.
>>>>>
>>>>> I have cases where update_time can be same.
>>>>> I am in need of suggestions, where I have very large data like 5 GB,
>>>>> this window based solution which I mentioned is taking very long time.
>>>>>
>>>>> Thanks again.
>>>>>
>>>>> On Thu, Apr 4, 2019 at 12:11 PM Abdeali Kothari <
>>>>> abdealikoth...@gmail.com> wrote:
>>>>>
>>>>>> So, the above code for min() worked for me fine in general, but there
>>>>>> was one corner case where it failed.
>>>>>> Which was when I have something like:
>>>>>> invoice_id=1, update_time=*2018-01-01 15:00:00.000*
>>>>>> invoice_id=1, update_time=*2018-01-01 15:00:00.000*
>>>>>> invoice_id=1, update_time=2018-02-03 14:00:00.000
>>>>>>
>>>>>> In this example, the update_time for 2 records is the exact same. So,
>>>>>> doing a filter for the min() will result in 2 records for the 
>>>>>> invoice_id=1.
>>>>>> This is avoided in your code snippet of row_num - because 2 rows will
>>>>>> never have row_num = 1
>>>>>>
>>>>>> But note that here - row_num=1 and row_num=2 will be randomly ordered
>>>>>> (because orderBy is on update_time and they have the same value of
>>>>>> update_time).
>>>>>> Hence dropDuplicates can be used there cause it can be either one of
>>>>>> those rows.
>>>>>>
>>>>>> Overall - dropDuplicates seems like it's meant for cases where you
>>>>>> literally have redundant duplicated data. And not for filtering to get
>>>>>> first/last etc.
>>>>>>
>>>>>>
>>>>>> On Thu, Apr 4, 2019 at 11:46 AM Chetan Khatri <
>>>>>> chetan.opensou...@gmail.com> wrote:
>>>>>>
>>>>>>> Hello Abdeali, Thank you for your response.
>>>>>>>
>>>>>>> Can you please explain me this line, And the dropDuplicates at the
>>>>>>> end ensures records with two values for the same 'update_time' don't 
>>>>>>> cause
>>>>>>> issues.
>>>>>>>
>>>>>>> Sorry I didn't get quickly. :)
>>>>>>>
>>>>>>> On Thu, Apr 4, 2019 at 10:41 AM Abdeali Kothari <
>>>>>>> abdealikoth...@gmail.com> wrote:
>>>>>>>
>>>>>>>> I've faced this issue too - and a colleague pointed me to the
>>>>>>>> documentation -
>>>>>>>> https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.dropDuplicates
>>>>>>>> dropDuplicates docs does

Re: dropDuplicate on timestamp based column unexpected output

2019-04-04 Thread Chetan Khatri
I will get the information and will share with you.

On Thu, Apr 4, 2019 at 5:03 PM Abdeali Kothari 
wrote:

> How long does it take to do the window solution ? (Also mention how many
> executors was your spark application using on average during that time)
> I am not aware of anything that is faster. When I ran is on my data ~8-9GB
> I think it took less than 5 mins (don't remember exact time)
>
> On Thu, Apr 4, 2019 at 1:09 PM Chetan Khatri 
> wrote:
>
>> Thanks for awesome clarification / explanation.
>>
>> I have cases where update_time can be same.
>> I am in need of suggestions, where I have very large data like 5 GB, this
>> window based solution which I mentioned is taking very long time.
>>
>> Thanks again.
>>
>> On Thu, Apr 4, 2019 at 12:11 PM Abdeali Kothari 
>> wrote:
>>
>>> So, the above code for min() worked for me fine in general, but there
>>> was one corner case where it failed.
>>> Which was when I have something like:
>>> invoice_id=1, update_time=*2018-01-01 15:00:00.000*
>>> invoice_id=1, update_time=*2018-01-01 15:00:00.000*
>>> invoice_id=1, update_time=2018-02-03 14:00:00.000
>>>
>>> In this example, the update_time for 2 records is the exact same. So,
>>> doing a filter for the min() will result in 2 records for the invoice_id=1.
>>> This is avoided in your code snippet of row_num - because 2 rows will
>>> never have row_num = 1
>>>
>>> But note that here - row_num=1 and row_num=2 will be randomly ordered
>>> (because orderBy is on update_time and they have the same value of
>>> update_time).
>>> Hence dropDuplicates can be used there cause it can be either one of
>>> those rows.
>>>
>>> Overall - dropDuplicates seems like it's meant for cases where you
>>> literally have redundant duplicated data. And not for filtering to get
>>> first/last etc.
>>>
>>>
>>> On Thu, Apr 4, 2019 at 11:46 AM Chetan Khatri <
>>> chetan.opensou...@gmail.com> wrote:
>>>
>>>> Hello Abdeali, Thank you for your response.
>>>>
>>>> Can you please explain me this line, And the dropDuplicates at the end
>>>> ensures records with two values for the same 'update_time' don't cause
>>>> issues.
>>>>
>>>> Sorry I didn't get quickly. :)
>>>>
>>>> On Thu, Apr 4, 2019 at 10:41 AM Abdeali Kothari <
>>>> abdealikoth...@gmail.com> wrote:
>>>>
>>>>> I've faced this issue too - and a colleague pointed me to the
>>>>> documentation -
>>>>> https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.dropDuplicates
>>>>> dropDuplicates docs does not say that it will guarantee that it will
>>>>> return the "first" record (even if you sort your dataframe)
>>>>> It would give you any record it finds and just ensure that duplicates
>>>>> are not present.
>>>>>
>>>>> The only way I know of how to do this is what you did, but you can
>>>>> avoid the sorting inside the partition with something like (in pyspark):
>>>>>
>>>>> from pyspark.sql import Window, functions as F
>>>>> df = df.withColumn('wanted_time',
>>>>> F.min('update_time').over(Window.partitionBy('invoice_id')))
>>>>> out_df = df.filter(df['update_time'] == df['wanted_time'])
>>>>> .drop('wanted_time').dropDuplicates('invoice_id', 'update_time')
>>>>>
>>>>> The min() is faster than doing an orderBy() and a row_number().
>>>>> And the dropDuplicates at the end ensures records with two values for
>>>>> the same 'update_time' don't cause issues.
>>>>>
>>>>>
>>>>> On Thu, Apr 4, 2019 at 10:22 AM Chetan Khatri <
>>>>> chetan.opensou...@gmail.com> wrote:
>>>>>
>>>>>> Hello Dear Spark Users,
>>>>>>
>>>>>> I am using dropDuplicate on a DataFrame generated from large parquet
>>>>>> file from(HDFS) and doing dropDuplicate based on timestamp based column,
>>>>>> every time I run it drops different - different rows based on same
>>>>>> timestamp.
>>>>>>
>>>>>> What I tried and worked
>>>>>>
>>>>>> val wSpec = Window.partitionBy($"invoice_id").orderBy($"update_time".
>>>>>> desc)
>>>>>>
>>>>>> val irqDistinctDF = irqFilteredDF.withColumn("rn",
>>>>>> row_number.over(wSpec)).where($"rn" === 1)
>>>>>> .drop("rn").drop("update_time")
>>>>>>
>>>>>> But this is damn slow...
>>>>>>
>>>>>> Can someone please throw a light.
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>>


Re: dropDuplicate on timestamp based column unexpected output

2019-04-04 Thread Chetan Khatri
Thanks for awesome clarification / explanation.

I have cases where update_time can be same.
I am in need of suggestions, where I have very large data like 5 GB, this
window based solution which I mentioned is taking very long time.

Thanks again.

On Thu, Apr 4, 2019 at 12:11 PM Abdeali Kothari 
wrote:

> So, the above code for min() worked for me fine in general, but there was
> one corner case where it failed.
> Which was when I have something like:
> invoice_id=1, update_time=*2018-01-01 15:00:00.000*
> invoice_id=1, update_time=*2018-01-01 15:00:00.000*
> invoice_id=1, update_time=2018-02-03 14:00:00.000
>
> In this example, the update_time for 2 records is the exact same. So,
> doing a filter for the min() will result in 2 records for the invoice_id=1.
> This is avoided in your code snippet of row_num - because 2 rows will
> never have row_num = 1
>
> But note that here - row_num=1 and row_num=2 will be randomly ordered
> (because orderBy is on update_time and they have the same value of
> update_time).
> Hence dropDuplicates can be used there cause it can be either one of those
> rows.
>
> Overall - dropDuplicates seems like it's meant for cases where you
> literally have redundant duplicated data. And not for filtering to get
> first/last etc.
>
>
> On Thu, Apr 4, 2019 at 11:46 AM Chetan Khatri 
> wrote:
>
>> Hello Abdeali, Thank you for your response.
>>
>> Can you please explain me this line, And the dropDuplicates at the end
>> ensures records with two values for the same 'update_time' don't cause
>> issues.
>>
>> Sorry I didn't get quickly. :)
>>
>> On Thu, Apr 4, 2019 at 10:41 AM Abdeali Kothari 
>> wrote:
>>
>>> I've faced this issue too - and a colleague pointed me to the
>>> documentation -
>>> https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.dropDuplicates
>>> dropDuplicates docs does not say that it will guarantee that it will
>>> return the "first" record (even if you sort your dataframe)
>>> It would give you any record it finds and just ensure that duplicates
>>> are not present.
>>>
>>> The only way I know of how to do this is what you did, but you can avoid
>>> the sorting inside the partition with something like (in pyspark):
>>>
>>> from pyspark.sql import Window, functions as F
>>> df = df.withColumn('wanted_time',
>>> F.min('update_time').over(Window.partitionBy('invoice_id')))
>>> out_df = df.filter(df['update_time'] == df['wanted_time'])
>>> .drop('wanted_time').dropDuplicates('invoice_id', 'update_time')
>>>
>>> The min() is faster than doing an orderBy() and a row_number().
>>> And the dropDuplicates at the end ensures records with two values for
>>> the same 'update_time' don't cause issues.
>>>
>>>
>>> On Thu, Apr 4, 2019 at 10:22 AM Chetan Khatri <
>>> chetan.opensou...@gmail.com> wrote:
>>>
>>>> Hello Dear Spark Users,
>>>>
>>>> I am using dropDuplicate on a DataFrame generated from large parquet
>>>> file from(HDFS) and doing dropDuplicate based on timestamp based column,
>>>> every time I run it drops different - different rows based on same
>>>> timestamp.
>>>>
>>>> What I tried and worked
>>>>
>>>> val wSpec = Window.partitionBy($"invoice_id").orderBy($"update_time".
>>>> desc)
>>>>
>>>> val irqDistinctDF = irqFilteredDF.withColumn("rn",
>>>> row_number.over(wSpec)).where($"rn" === 1)
>>>> .drop("rn").drop("update_time")
>>>>
>>>> But this is damn slow...
>>>>
>>>> Can someone please throw a light.
>>>>
>>>> Thanks
>>>>
>>>>


Re: dropDuplicate on timestamp based column unexpected output

2019-04-04 Thread Chetan Khatri
Hello Abdeali, Thank you for your response.

Can you please explain me this line, And the dropDuplicates at the end
ensures records with two values for the same 'update_time' don't cause
issues.

Sorry I didn't get quickly. :)

On Thu, Apr 4, 2019 at 10:41 AM Abdeali Kothari 
wrote:

> I've faced this issue too - and a colleague pointed me to the
> documentation -
> https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.dropDuplicates
> dropDuplicates docs does not say that it will guarantee that it will
> return the "first" record (even if you sort your dataframe)
> It would give you any record it finds and just ensure that duplicates are
> not present.
>
> The only way I know of how to do this is what you did, but you can avoid
> the sorting inside the partition with something like (in pyspark):
>
> from pyspark.sql import Window, functions as F
> df = df.withColumn('wanted_time',
> F.min('update_time').over(Window.partitionBy('invoice_id')))
> out_df = df.filter(df['update_time'] == df['wanted_time'])
> .drop('wanted_time').dropDuplicates('invoice_id', 'update_time')
>
> The min() is faster than doing an orderBy() and a row_number().
> And the dropDuplicates at the end ensures records with two values for the
> same 'update_time' don't cause issues.
>
>
> On Thu, Apr 4, 2019 at 10:22 AM Chetan Khatri 
> wrote:
>
>> Hello Dear Spark Users,
>>
>> I am using dropDuplicate on a DataFrame generated from large parquet file
>> from(HDFS) and doing dropDuplicate based on timestamp based column, every
>> time I run it drops different - different rows based on same timestamp.
>>
>> What I tried and worked
>>
>> val wSpec = Window.partitionBy($"invoice_id").orderBy($"update_time".
>> desc)
>>
>> val irqDistinctDF = irqFilteredDF.withColumn("rn",
>> row_number.over(wSpec)).where($"rn" === 1) .drop("rn").drop("update_time"
>> )
>>
>> But this is damn slow...
>>
>> Can someone please throw a light.
>>
>> Thanks
>>
>>


dropDuplicate on timestamp based column unexpected output

2019-04-03 Thread Chetan Khatri
Hello Dear Spark Users,

I am using dropDuplicate on a DataFrame generated from large parquet file
from(HDFS) and doing dropDuplicate based on timestamp based column, every
time I run it drops different - different rows based on same timestamp.

What I tried and worked

val wSpec = Window.partitionBy($"invoice_id").orderBy($"update_time".desc)

val irqDistinctDF = irqFilteredDF.withColumn("rn",
row_number.over(wSpec)).where($"rn" === 1) .drop("rn").drop("update_time")

But this is damn slow...

Can someone please throw a light.

Thanks


Re: Increase time for Spark Job to be in Accept mode in Yarn

2019-01-23 Thread Chetan Khatri
Hello Beliefer,
I am orchestrating many spark jobs using Airflow and when some of the spark
jobs get started and running and many other would be in accepted mode and
sometimes 1-2 jobs go to failure state if yarn cannot create container
application.

Thanks

On Wed, Jan 23, 2019 at 9:15 AM 大啊  wrote:

> Hi , please tell me why you need to increase the time?
>
>
>
>
>
> At 2019-01-22 18:38:29, "Chetan Khatri" 
> wrote:
>
> Hello Spark Users,
>
> Can you please tell me how to increase the time for Spark job to be in
> *Accept* mode in Yarn.
>
> Thank you. Regards,
> Chetan
>
>
>
>
>


Increase time for Spark Job to be in Accept mode in Yarn

2019-01-22 Thread Chetan Khatri
Hello Spark Users,

Can you please tell me how to increase the time for Spark job to be in
*Accept* mode in Yarn.

Thank you. Regards,
Chetan


Re: How to Keep Null values in Parquet

2018-11-21 Thread Chetan Khatri
Hello Soumya,

Thanks for quick response, I haven't tried. I am doing now and see.


On Thu, Nov 22, 2018 at 8:13 AM Soumya D. Sanyal 
wrote:

> Hi Chetan,
>
> Have you tried casting the null values/columns to a supported type — e.g.
> `StringType`, `IntegerType`, etc?
>
> See also https://issues.apache.org/jira/browse/SPARK-10943.
>
> — Soumya
>
>
> On Nov 21, 2018, at 9:29 PM, Chetan Khatri 
> wrote:
>
> Hello Spark Users,
>
> I have a Dataframe with some of Null Values, When I am writing to parquet
> it is failing with below error:
>
> Caused by: java.lang.RuntimeException: Unsupported data type NullType.
>   at scala.sys.package$.error(package.scala:27)
>   at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.org 
> <http://org.apache.spark.sql.execution.datasources.parquet.parquetwritesupport.org/>$apache$spark$sql$execution$datasources$parquet$ParquetWriteSupport$$makeWriter(ParquetWriteSupport.scala:206)
>   at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport$$anonfun$init$2.apply(ParquetWriteSupport.scala:93)
>   at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport$$anonfun$init$2.apply(ParquetWriteSupport.scala:93)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.immutable.List.map(List.scala:285)
>   at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.init(ParquetWriteSupport.scala:93)
>   at 
> org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:341)
>   at 
> org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:302)
>   at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.(ParquetOutputWriter.scala:37)
>   at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:151)
>   at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.newOutputWriter(FileFormatWriter.scala:367)
>   at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:378)
>
> Thanks
>
>
>
>


How to Keep Null values in Parquet

2018-11-21 Thread Chetan Khatri
Hello Spark Users,

I have a Dataframe with some of Null Values, When I am writing to parquet
it is failing with below error:

Caused by: java.lang.RuntimeException: Unsupported data type NullType.
at scala.sys.package$.error(package.scala:27)
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.org$apache$spark$sql$execution$datasources$parquet$ParquetWriteSupport$$makeWriter(ParquetWriteSupport.scala:206)
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport$$anonfun$init$2.apply(ParquetWriteSupport.scala:93)
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport$$anonfun$init$2.apply(ParquetWriteSupport.scala:93)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.init(ParquetWriteSupport.scala:93)
at 
org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:341)
at 
org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:302)
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.(ParquetOutputWriter.scala:37)
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:151)
at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.newOutputWriter(FileFormatWriter.scala:367)
at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:378)

Thanks


Spark 2.3.0 with HDP Got completely successfully but status FAILED with error

2018-11-21 Thread Chetan Khatri
Hello Spark Users,

I am working with Spark 2.3.0 with HDP Distribution, where my spark job got
completed successfully but final job status is failed with below error:

What is best way to prevent this kind of errors? Thanks


8/11/21 17:38:15 INFO ApplicationMaster: Final app status: SUCCEEDED,
exitCode: 0

18/11/21 17:38:15 INFO SparkContext: Invoking stop() from shutdown hook


18/11/21 17:38:25 WARN ShutdownHookManager: ShutdownHook '$anon$2' timeout,
java.util.concurrent.TimeoutException

java.util.concurrent.TimeoutException

at java.util.concurrent.FutureTask.get(FutureTask.java:205)

at
org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:67)

18/11/21 17:38:25 ERROR Utils: Uncaught exception in thread pool-1-thread-1

java.lang.InterruptedException

at java.lang.Object.wait(Native Method)

at java.lang.Thread.join(Thread.java:1252)

at java.lang.Thread.join(Thread.java:1326)

at
org.apache.spark.scheduler.AsyncEventQueue.stop(AsyncEventQueue.scala:133)

at
org.apache.spark.scheduler.LiveListenerBus$$anonfun$stop$1.apply(LiveListenerBus.scala:219)

at
org.apache.spark.scheduler.LiveListenerBus$$anonfun$stop$1.apply(LiveListenerBus.scala:219)

at scala.collection.Iterator$class.foreach(Iterator.scala:893)

at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)

at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

at scala.collection.AbstractIterable.foreach(Iterable.scala:54)

at
org.apache.spark.scheduler.LiveListenerBus.stop(LiveListenerBus.scala:219)

at
org.apache.spark.SparkContext$$anonfun$stop$6.apply$mcV$sp(SparkContext.scala:1922)

at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1357)

at org.apache.spark.SparkContext.stop(SparkContext.scala:1921)

at
org.apache.spark.SparkContext$$anonfun$2.apply$mcV$sp(SparkContext.scala:573)

at
org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:216)

at
org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:188)

at
org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)

at
org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)

at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1988)

at
org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:188)

at
org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)

at
org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)

at scala.util.Try$.apply(Try.scala:192)

at
org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)

at
org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)

18/11/21 17:38:28 WARN DFSClient: Unable to persist blocks in hflush for
/spark2-history/application_1542818587370_0020_2.inprogress

org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException):
No lease on /spark2-history/application_1542818587370_0020_2.inprogress
(inode 105059): File is not open for writing. Holder
DFSClient_NONMAPREDUCE_-1311526457_1 does not have any open files.

at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3712)

at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.fsync(FSNamesystem.java:4324)

at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.fsync(NameNodeRpcServer.java:1354)

at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.fsync(ClientNamenodeProtocolServerSideTranslatorPB.java:926)

at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)

at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)

at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)

at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2351)

at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2347)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1869)

at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2347)



at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1554)

at org.apache.hadoop.ipc.Client.call(Client.java:1498)

at 

How to do efficient self join with Spark-SQL and Scala

2018-09-21 Thread Chetan Khatri
Dear Spark Users,

I came across little weird MSSQL Query to replace with Spark and I am like
no clue how to do it in an efficient way with Scala + SparkSQL. Can someone
please throw light. I can create view of DataFrame and do it as
*spark.sql *(query)
but I would like to do it with Scala + Spark way.

Sample:











*select a.student_id,a.candidate_id, a.student_name, a.student_standard,
a.student_city, b.teacher_name, a.student_status ,a.approval_id, case when
a.approval_id = 2 and (a.academic_start_date is nulland not exists (select
student_id from tbl_student where candidate_id = c.candidate_id and
approval_id = 2and academic_start_date is null)) then 'Yes'else 'No'end as
is_currentfrom tbl_student a inner join tbl_teacher b on a.candidate_id =
b.candidate_id inner join tbl_network con c.candidate_id = a.candidate_id*

Thank you.


Re: Run Python User Defined Functions / code in Spark with Scala Codebase

2018-07-15 Thread Chetan Khatri
Hello Jayant,

Thanks for great OSS Contribution :)

On Thu, Jul 12, 2018 at 1:36 PM, Jayant Shekhar 
wrote:

> Hello Chetan,
>
> Sorry missed replying earlier. You can find some sample code here :
>
> http://sparkflows.readthedocs.io/en/latest/user-guide/
> python/pipe-python.html
>
> We will continue adding more there.
>
> Feel free to ping me directly in case of questions.
>
> Thanks,
> Jayant
>
>
> On Mon, Jul 9, 2018 at 9:56 PM, Chetan Khatri  > wrote:
>
>> Hello Jayant,
>>
>> Thank you so much for suggestion. My view was to  use Python function as
>> transformation which can take couple of column names and return object.
>> which you explained. would that possible to point me to similiar codebase
>> example.
>>
>> Thanks.
>>
>> On Fri, Jul 6, 2018 at 2:56 AM, Jayant Shekhar 
>> wrote:
>>
>>> Hello Chetan,
>>>
>>> We have currently done it with .pipe(.py) as Prem suggested.
>>>
>>> That passes the RDD as CSV strings to the python script. The python
>>> script can either process it line by line, create the result and return it
>>> back. Or create things like Pandas Dataframe for processing and finally
>>> write the results back.
>>>
>>> In the Spark/Scala/Java code, you get an RDD of string, which we convert
>>> back to a Dataframe.
>>>
>>> Feel free to ping me directly in case of questions.
>>>
>>> Thanks,
>>> Jayant
>>>
>>>
>>> On Thu, Jul 5, 2018 at 3:39 AM, Chetan Khatri <
>>> chetan.opensou...@gmail.com> wrote:
>>>
>>>> Prem sure, Thanks for suggestion.
>>>>
>>>> On Wed, Jul 4, 2018 at 8:38 PM, Prem Sure 
>>>> wrote:
>>>>
>>>>> try .pipe(.py) on RDD
>>>>>
>>>>> Thanks,
>>>>> Prem
>>>>>
>>>>> On Wed, Jul 4, 2018 at 7:59 PM, Chetan Khatri <
>>>>> chetan.opensou...@gmail.com> wrote:
>>>>>
>>>>>> Can someone please suggest me , thanks
>>>>>>
>>>>>> On Tue 3 Jul, 2018, 5:28 PM Chetan Khatri, <
>>>>>> chetan.opensou...@gmail.com> wrote:
>>>>>>
>>>>>>> Hello Dear Spark User / Dev,
>>>>>>>
>>>>>>> I would like to pass Python user defined function to Spark Job
>>>>>>> developed using Scala and return value of that function would be 
>>>>>>> returned
>>>>>>> to DF / Dataset API.
>>>>>>>
>>>>>>> Can someone please guide me, which would be best approach to do
>>>>>>> this. Python function would be mostly transformation function. Also 
>>>>>>> would
>>>>>>> like to pass Java Function as a String to Spark / Scala job and it 
>>>>>>> applies
>>>>>>> to RDD / Data Frame and should return RDD / Data Frame.
>>>>>>>
>>>>>>> Thank you.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: Run Python User Defined Functions / code in Spark with Scala Codebase

2018-07-09 Thread Chetan Khatri
Hello Jayant,

Thank you so much for suggestion. My view was to  use Python function as
transformation which can take couple of column names and return object.
which you explained. would that possible to point me to similiar codebase
example.

Thanks.

On Fri, Jul 6, 2018 at 2:56 AM, Jayant Shekhar 
wrote:

> Hello Chetan,
>
> We have currently done it with .pipe(.py) as Prem suggested.
>
> That passes the RDD as CSV strings to the python script. The python script
> can either process it line by line, create the result and return it back.
> Or create things like Pandas Dataframe for processing and finally write the
> results back.
>
> In the Spark/Scala/Java code, you get an RDD of string, which we convert
> back to a Dataframe.
>
> Feel free to ping me directly in case of questions.
>
> Thanks,
> Jayant
>
>
> On Thu, Jul 5, 2018 at 3:39 AM, Chetan Khatri  > wrote:
>
>> Prem sure, Thanks for suggestion.
>>
>> On Wed, Jul 4, 2018 at 8:38 PM, Prem Sure  wrote:
>>
>>> try .pipe(.py) on RDD
>>>
>>> Thanks,
>>> Prem
>>>
>>> On Wed, Jul 4, 2018 at 7:59 PM, Chetan Khatri <
>>> chetan.opensou...@gmail.com> wrote:
>>>
>>>> Can someone please suggest me , thanks
>>>>
>>>> On Tue 3 Jul, 2018, 5:28 PM Chetan Khatri, 
>>>> wrote:
>>>>
>>>>> Hello Dear Spark User / Dev,
>>>>>
>>>>> I would like to pass Python user defined function to Spark Job
>>>>> developed using Scala and return value of that function would be returned
>>>>> to DF / Dataset API.
>>>>>
>>>>> Can someone please guide me, which would be best approach to do this.
>>>>> Python function would be mostly transformation function. Also would like 
>>>>> to
>>>>> pass Java Function as a String to Spark / Scala job and it applies to RDD 
>>>>> /
>>>>> Data Frame and should return RDD / Data Frame.
>>>>>
>>>>> Thank you.
>>>>>
>>>>>
>>>>>
>>>>>
>>>
>>
>


Re: Run Python User Defined Functions / code in Spark with Scala Codebase

2018-07-05 Thread Chetan Khatri
Prem sure, Thanks for suggestion.

On Wed, Jul 4, 2018 at 8:38 PM, Prem Sure  wrote:

> try .pipe(.py) on RDD
>
> Thanks,
> Prem
>
> On Wed, Jul 4, 2018 at 7:59 PM, Chetan Khatri  > wrote:
>
>> Can someone please suggest me , thanks
>>
>> On Tue 3 Jul, 2018, 5:28 PM Chetan Khatri, 
>> wrote:
>>
>>> Hello Dear Spark User / Dev,
>>>
>>> I would like to pass Python user defined function to Spark Job developed
>>> using Scala and return value of that function would be returned to DF /
>>> Dataset API.
>>>
>>> Can someone please guide me, which would be best approach to do this.
>>> Python function would be mostly transformation function. Also would like to
>>> pass Java Function as a String to Spark / Scala job and it applies to RDD /
>>> Data Frame and should return RDD / Data Frame.
>>>
>>> Thank you.
>>>
>>>
>>>
>>>
>


Re: Run Python User Defined Functions / code in Spark with Scala Codebase

2018-07-04 Thread Chetan Khatri
Can someone please suggest me , thanks

On Tue 3 Jul, 2018, 5:28 PM Chetan Khatri, 
wrote:

> Hello Dear Spark User / Dev,
>
> I would like to pass Python user defined function to Spark Job developed
> using Scala and return value of that function would be returned to DF /
> Dataset API.
>
> Can someone please guide me, which would be best approach to do this.
> Python function would be mostly transformation function. Also would like to
> pass Java Function as a String to Spark / Scala job and it applies to RDD /
> Data Frame and should return RDD / Data Frame.
>
> Thank you.
>
>
>
>


Run Python User Defined Functions / code in Spark with Scala Codebase

2018-07-03 Thread Chetan Khatri
Hello Dear Spark User / Dev,

I would like to pass Python user defined function to Spark Job developed
using Scala and return value of that function would be returned to DF /
Dataset API.

Can someone please guide me, which would be best approach to do this.
Python function would be mostly transformation function. Also would like to
pass Java Function as a String to Spark / Scala job and it applies to RDD /
Data Frame and should return RDD / Data Frame.

Thank you.


Re: Apply Core Java Transformation UDF on DataFrame

2018-06-05 Thread Chetan Khatri
Anyone can throw light on this. would be helpful.

On Tue, Jun 5, 2018 at 1:41 AM, Chetan Khatri 
wrote:

> All,
>
> I would like to Apply Java Transformation UDF on DataFrame created from
> Table, Flat Files and retrun new Data Frame Object. Any suggestions, with
> respect to Spark Internals.
>
> Thanks.
>


Apply Core Java Transformation UDF on DataFrame

2018-06-04 Thread Chetan Khatri
All,

I would like to Apply Java Transformation UDF on DataFrame created from
Table, Flat Files and retrun new Data Frame Object. Any suggestions, with
respect to Spark Internals.

Thanks.


Re: 答复: GroupBy in Spark / Scala without Agg functions

2018-05-29 Thread Chetan Khatri
I see, Thank you for explanation LInyuxin

On Wed, May 30, 2018 at 6:21 AM, Linyuxin  wrote:

> Hi,
>
> Why not group by first then join?
>
> BTW, I don’t think there any difference between ‘distinct’ and ‘group by’
>
>
>
> Source code of 2.1:
>
> *def *distinct(): Dataset[T] = dropDuplicates()
>
> …
>
> def dropDuplicates(colNames: Seq[String]): Dataset[T] = withTypedPlan {
>
> …
>
> Aggregate(groupCols, aggCols, logicalPlan)
> }
>
>
>
>
>
>
>
>
>
> *发件人**:* Chetan Khatri [mailto:chetan.opensou...@gmail.com]
> *发送时间:* 2018年5月30日 2:52
> *收件人:* Irving Duran 
> *抄送:* Georg Heiler ; user <
> user@spark.apache.org>
> *主题:* Re: GroupBy in Spark / Scala without Agg functions
>
>
>
> Georg, Sorry for dumb question. Help me to understand - if i do
> DF.select(A,B,C,D)*.distinct() *that would be same as above groupBy
> without agg in sql right ?
>
>
>
> On Wed, May 30, 2018 at 12:17 AM, Chetan Khatri <
> chetan.opensou...@gmail.com> wrote:
>
> I don't want to get any aggregation, just want to know rather saying
> distinct to all columns any other better approach ?
>
>
>
> On Wed, May 30, 2018 at 12:16 AM, Irving Duran 
> wrote:
>
> Unless you want to get a count, yes.
>
>
> Thank You,
>
> Irving Duran
>
>
>
>
>
> On Tue, May 29, 2018 at 1:44 PM Chetan Khatri 
> wrote:
>
> Georg, I just want to double check that someone wrote MSSQL Server script
> where it's groupby all columns. What is alternate best way to do distinct
> all columns ?
>
>
>
>
>
>
>
> On Wed, May 30, 2018 at 12:08 AM, Georg Heiler 
> wrote:
>
> Why do you group if you do not want to aggregate?
>
> Isn't this the same as select distinct?
>
>
>
> Chetan Khatri  schrieb am Di., 29. Mai 2018
> um 20:21 Uhr:
>
> All,
>
>
>
> I have scenario like this in MSSQL Server SQL where i need to do groupBy
> without Agg function:
>
>
>
> Pseudocode:
>
>
>
>
>
> select m.student_id, m.student_name, m.student_std, m.student_group,
> m.student_d
>
> ob from student as m inner join general_register g on m.student_id =
> g.student_i
>
> d group by m.student_id, m.student_name, m.student_std, m.student_group,
> m.student_dob
>
>
>
> I tried to doing in spark but i am not able to get Dataframe as return
> value, how this kind of things could be done in Spark.
>
>
>
> Thanks
>
>
>
>
>
>
>


Re: GroupBy in Spark / Scala without Agg functions

2018-05-29 Thread Chetan Khatri
Georg, Sorry for dumb question. Help me to understand - if i do
DF.select(A,B,C,D)*.distinct() *that would be same as above groupBy without
agg in sql right ?

On Wed, May 30, 2018 at 12:17 AM, Chetan Khatri  wrote:

> I don't want to get any aggregation, just want to know rather saying
> distinct to all columns any other better approach ?
>
> On Wed, May 30, 2018 at 12:16 AM, Irving Duran 
> wrote:
>
>> Unless you want to get a count, yes.
>>
>> Thank You,
>>
>> Irving Duran
>>
>>
>> On Tue, May 29, 2018 at 1:44 PM Chetan Khatri <
>> chetan.opensou...@gmail.com> wrote:
>>
>>> Georg, I just want to double check that someone wrote MSSQL Server
>>> script where it's groupby all columns. What is alternate best way to do
>>> distinct all columns ?
>>>
>>>
>>>
>>> On Wed, May 30, 2018 at 12:08 AM, Georg Heiler <
>>> georg.kf.hei...@gmail.com> wrote:
>>>
>>>> Why do you group if you do not want to aggregate?
>>>> Isn't this the same as select distinct?
>>>>
>>>> Chetan Khatri  schrieb am Di., 29. Mai
>>>> 2018 um 20:21 Uhr:
>>>>
>>>>> All,
>>>>>
>>>>> I have scenario like this in MSSQL Server SQL where i need to do
>>>>> groupBy without Agg function:
>>>>>
>>>>> Pseudocode:
>>>>>
>>>>>
>>>>> select m.student_id, m.student_name, m.student_std, m.student_group,
>>>>> m.student_d
>>>>> ob from student as m inner join general_register g on m.student_id =
>>>>> g.student_i
>>>>> d group by m.student_id, m.student_name, m.student_std,
>>>>> m.student_group, m.student_dob
>>>>>
>>>>> I tried to doing in spark but i am not able to get Dataframe as return
>>>>> value, how this kind of things could be done in Spark.
>>>>>
>>>>> Thanks
>>>>>
>>>>
>>>
>


Re: GroupBy in Spark / Scala without Agg functions

2018-05-29 Thread Chetan Khatri
I don't want to get any aggregation, just want to know rather saying
distinct to all columns any other better approach ?

On Wed, May 30, 2018 at 12:16 AM, Irving Duran 
wrote:

> Unless you want to get a count, yes.
>
> Thank You,
>
> Irving Duran
>
>
> On Tue, May 29, 2018 at 1:44 PM Chetan Khatri 
> wrote:
>
>> Georg, I just want to double check that someone wrote MSSQL Server script
>> where it's groupby all columns. What is alternate best way to do distinct
>> all columns ?
>>
>>
>>
>> On Wed, May 30, 2018 at 12:08 AM, Georg Heiler > > wrote:
>>
>>> Why do you group if you do not want to aggregate?
>>> Isn't this the same as select distinct?
>>>
>>> Chetan Khatri  schrieb am Di., 29. Mai
>>> 2018 um 20:21 Uhr:
>>>
>>>> All,
>>>>
>>>> I have scenario like this in MSSQL Server SQL where i need to do
>>>> groupBy without Agg function:
>>>>
>>>> Pseudocode:
>>>>
>>>>
>>>> select m.student_id, m.student_name, m.student_std, m.student_group,
>>>> m.student_d
>>>> ob from student as m inner join general_register g on m.student_id =
>>>> g.student_i
>>>> d group by m.student_id, m.student_name, m.student_std,
>>>> m.student_group, m.student_dob
>>>>
>>>> I tried to doing in spark but i am not able to get Dataframe as return
>>>> value, how this kind of things could be done in Spark.
>>>>
>>>> Thanks
>>>>
>>>
>>


Re: GroupBy in Spark / Scala without Agg functions

2018-05-29 Thread Chetan Khatri
Georg, I just want to double check that someone wrote MSSQL Server script
where it's groupby all columns. What is alternate best way to do distinct
all columns ?



On Wed, May 30, 2018 at 12:08 AM, Georg Heiler 
wrote:

> Why do you group if you do not want to aggregate?
> Isn't this the same as select distinct?
>
> Chetan Khatri  schrieb am Di., 29. Mai 2018
> um 20:21 Uhr:
>
>> All,
>>
>> I have scenario like this in MSSQL Server SQL where i need to do groupBy
>> without Agg function:
>>
>> Pseudocode:
>>
>>
>> select m.student_id, m.student_name, m.student_std, m.student_group,
>> m.student_d
>> ob from student as m inner join general_register g on m.student_id =
>> g.student_i
>> d group by m.student_id, m.student_name, m.student_std, m.student_group,
>> m.student_dob
>>
>> I tried to doing in spark but i am not able to get Dataframe as return
>> value, how this kind of things could be done in Spark.
>>
>> Thanks
>>
>


GroupBy in Spark / Scala without Agg functions

2018-05-29 Thread Chetan Khatri
All,

I have scenario like this in MSSQL Server SQL where i need to do groupBy
without Agg function:

Pseudocode:


select m.student_id, m.student_name, m.student_std, m.student_group,
m.student_d
ob from student as m inner join general_register g on m.student_id =
g.student_i
d group by m.student_id, m.student_name, m.student_std, m.student_group,
m.student_dob

I tried to doing in spark but i am not able to get Dataframe as return
value, how this kind of things could be done in Spark.

Thanks


Re: Bulk / Fast Read and Write with MSSQL Server and Spark

2018-05-25 Thread Chetan Khatri
Ajay, You can use Sqoop if wants to ingest data to HDFS. This is POC where
customer wants to prove that Spark ETL would be faster than C# based raw
SQL Statements. That's all, There are no time-stamp based columns in Source
tables to make it incremental load.

On Thu, May 24, 2018 at 1:08 AM, ayan guha <guha.a...@gmail.com> wrote:

> Curious question: what is the reason of using spark here? Why not simple
> sql-based ETL?
>
> On Thu, May 24, 2018 at 5:09 AM, Ajay <ajay.ku...@gmail.com> wrote:
>
>> Do you worry about spark overloading the SQL server?  We have had this
>> issue in the past where all spark slaves tend to send lots of data at once
>> to SQL and that slows down the latency of the rest of the system. We
>> overcame this by using sqoop and running it in a controlled environment.
>>
>> On Wed, May 23, 2018 at 7:32 AM Chetan Khatri <
>> chetan.opensou...@gmail.com> wrote:
>>
>>> Super, just giving high level idea what i want to do. I have one source
>>> schema which is MS SQL Server 2008 and target is also MS SQL Server 2008.
>>> Currently there is c# based ETL application which does extract transform
>>> and load as customer specific schema including indexing etc.
>>>
>>>
>>> Thanks
>>>
>>> On Wed, May 23, 2018 at 7:11 PM, kedarsdixit <
>>> kedarnath_di...@persistent.com> wrote:
>>>
>>>> Yes.
>>>>
>>>> Regards,
>>>> Kedar Dixit
>>>>
>>>>
>>>>
>>>> --
>>>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>>>
>>>> -
>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>
>>>>
>>>
>>
>> --
>> Thanks,
>> Ajay
>>
>
>
>
> --
> Best Regards,
> Ayan Guha
>


Re: Bulk / Fast Read and Write with MSSQL Server and Spark

2018-05-23 Thread Chetan Khatri
Super, just giving high level idea what i want to do. I have one source
schema which is MS SQL Server 2008 and target is also MS SQL Server 2008.
Currently there is c# based ETL application which does extract transform
and load as customer specific schema including indexing etc.


Thanks

On Wed, May 23, 2018 at 7:11 PM, kedarsdixit  wrote:

> Yes.
>
> Regards,
> Kedar Dixit
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Bulk / Fast Read and Write with MSSQL Server and Spark

2018-05-23 Thread Chetan Khatri
Thank you Kedar Dixit, Silvio Fiorito.

Just one question that - even it's not an azure cloud MS-SQL Server. It
should support MS-SQL Server installed on local machine. right ?

Thank you.

On Wed, May 23, 2018 at 6:18 PM, Silvio Fiorito <
silvio.fior...@granturing.com> wrote:

> Try this https://docs.microsoft.com/en-us/azure/sql-database/sql-
> database-spark-connector
>
>
>
>
>
> *From: *Chetan Khatri <chetan.opensou...@gmail.com>
> *Date: *Wednesday, May 23, 2018 at 7:47 AM
> *To: *user <user@spark.apache.org>
> *Subject: *Bulk / Fast Read and Write with MSSQL Server and Spark
>
>
>
> All,
>
>
>
> I am looking for approach to do bulk read / write with MSSQL Server and
> Apache Spark 2.2 , please let me know if any library / driver for the same.
>
>
>
> Thank you.
>
> Chetan
>


Bulk / Fast Read and Write with MSSQL Server and Spark

2018-05-23 Thread Chetan Khatri
All,

I am looking for approach to do bulk read / write with MSSQL Server and
Apache Spark 2.2 , please let me know if any library / driver for the same.

Thank you.
Chetan


Livy Failed error on Yarn with Spark

2018-05-09 Thread Chetan Khatri
All,

I am running on Hortonworks HDP Hadoop with Livy and Spark 2.2.0, when I am
running same spark job using spark-submit it is getting success with all
transformations are done.

When I am trying to do spark submit using Livy, at that time Spark Job is
getting invoked and getting success but Yarn status says : FAILED and when
you take a look on logs at attempt : Log says  SUCCESS and there is no
error log.

Any one has faced this weird exprience ?

Thank you.


Re: NLTK with Spark Streaming

2017-11-26 Thread Chetan Khatri
But you can still use Stanford NLP library and distribute through spark
right !

On Sun, Nov 26, 2017 at 3:31 PM, Holden Karau  wrote:

> So it’s certainly doable (it’s not super easy mind you), but until the
> arrow udf release goes out it will be rather slow.
>
> On Sun, Nov 26, 2017 at 8:01 AM ashish rawat  wrote:
>
>> Hi,
>>
>> Has someone tried running NLTK (python) with Spark Streaming (scala)? I
>> was wondering if this is a good idea and what are the right Spark operators
>> to do this? The reason we want to try this combination is that we don't
>> want to run our transformations in python (pyspark), but after the
>> transformations, we need to run some natural language processing operations
>> and we don't want to restrict the functions data scientists' can use to
>> Spark natural language library. So, Spark streaming with NLTK looks like
>> the right option, from the perspective of fast data processing and data
>> science flexibility.
>>
>> Regards,
>> Ashish
>>
> --
> Twitter: https://twitter.com/holdenkarau
>


Re: Spark Writing to parquet directory : java.io.IOException: Disk quota exceeded

2017-11-22 Thread Chetan Khatri
Anybody reply on this ?

On Tue, Nov 21, 2017 at 3:36 PM, Chetan Khatri <chetan.opensou...@gmail.com>
wrote:

>
> Hello Spark Users,
>
> I am getting below error, when i am trying to write dataset to parquet
> location. I have enough disk space available. Last time i was facing same
> kind of error which were resolved by increasing number of cores at hyper
> parameters. Currently result set data size is almost 400Gig with below
> hyper parameters
>
> Driver memory: 4g
> Executor Memory: 16g
> Executor cores=12
> num executors= 8
>
> Still it's failing, any Idea ? that if i increase executor memory and
> number of executors.  it could get resolved ?
>
>
> 17/11/21 04:29:37 ERROR storage.DiskBlockObjectWriter: Uncaught exception
> while reverting partial writes to file /mapr/chetan/local/david.com/
> tmp/hadoop/nm-local-dir/usercache/david-khurana/appcache/application_
> 1509639363072_10572/blockmgr-008604e6-37cb-421f-8cc5-
> e94db75684e7/12/temp_shuffle_ae885911-a1ef-404f-9a6a-ded544bb5b3c
> java.io.IOException: Disk quota exceeded
> at java.io.FileOutputStream.close0(Native Method)
> at java.io.FileOutputStream.access$000(FileOutputStream.java:53)
> at java.io.FileOutputStream$1.close(FileOutputStream.java:356)
> at java.io.FileDescriptor.closeAll(FileDescriptor.java:212)
> at java.io.FileOutputStream.close(FileOutputStream.java:354)
> at org.apache.spark.storage.TimeTrackingOutputStream.close(
> TimeTrackingOutputStream.java:72)
> at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
> at net.jpountz.lz4.LZ4BlockOutputStream.close(
> LZ4BlockOutputStream.java:178)
> at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
> at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
> at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$
> anon$2.close(UnsafeRowSerializer.scala:96)
> at org.apache.spark.storage.DiskBlockObjectWriter$$
> anonfun$close$2.apply$mcV$sp(DiskBlockObjectWriter.scala:108)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.
> scala:1316)
> at org.apache.spark.storage.DiskBlockObjectWriter.close(
> DiskBlockObjectWriter.scala:107)
> at org.apache.spark.storage.DiskBlockObjectWriter.
> revertPartialWritesAndClose(DiskBlockObjectWriter.scala:159)
> at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.
> stop(BypassMergeSortShuffleWriter.java:234)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:85)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:47)
> at org.apache.spark.scheduler.Task.run(Task.scala:86)
> at org.apache.spark.executor.Executor$TaskRunner.run(
> Executor.scala:274)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> 17/11/21 04:29:37 WARN netty.OneWayOutboxMessage: Failed to send one-way
> RPC.
> java.io.IOException: Failed to connect to /192.168.123.43:58889
> at org.apache.spark.network.client.TransportClientFactory.
> createClient(TransportClientFactory.java:228)
> at org.apache.spark.network.client.TransportClientFactory.
> createClient(TransportClientFactory.java:179)
> at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(
> NettyRpcEnv.scala:197)
> at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.
> scala:191)
> at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.
> scala:187)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.net.ConnectException: Connection refused: /
> 192.168.123.43:58889
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> at sun.nio.ch.SocketChannelImpl.finishConnect(
> SocketChannelImpl.java:717)
> at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(
> NioSocketChannel.java:224)
> at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.
> finishConnect(AbstractNioChannel.java:289)
> at io.netty.channel.nio.NioEventLoop.processSelectedKey(
> NioEventLoop.java:528)
> at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(
> NioEventLoop.java:468)
> at io.netty.channel.nio.NioEventLoop.processSelectedKeys(
> NioEventLoop.java:382)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> at io.netty.util.concurrent.SingleThreadEventExecutor$2.
> run(SingleThreadEventExecutor.java:111)
>   ... 1 more
>


Spark Writing to parquet directory : java.io.IOException: Disk quota exceeded

2017-11-21 Thread Chetan Khatri
Hello Spark Users,

I am getting below error, when i am trying to write dataset to parquet
location. I have enough disk space available. Last time i was facing same
kind of error which were resolved by increasing number of cores at hyper
parameters. Currently result set data size is almost 400Gig with below
hyper parameters

Driver memory: 4g
Executor Memory: 16g
Executor cores=12
num executors= 8

Still it's failing, any Idea ? that if i increase executor memory and
number of executors.  it could get resolved ?


17/11/21 04:29:37 ERROR storage.DiskBlockObjectWriter: Uncaught exception
while reverting partial writes to file /mapr/chetan/local/
david.com/tmp/hadoop/nm-local-dir/usercache/david-khurana/appcache/application_1509639363072_10572/blockmgr-008604e6-37cb-421f-8cc5-e94db75684e7/12/temp_shuffle_ae885911-a1ef-404f-9a6a-ded544bb5b3c
java.io.IOException: Disk quota exceeded
at java.io.FileOutputStream.close0(Native Method)
at java.io.FileOutputStream.access$000(FileOutputStream.java:53)
at java.io.FileOutputStream$1.close(FileOutputStream.java:356)
at java.io.FileDescriptor.closeAll(FileDescriptor.java:212)
at java.io.FileOutputStream.close(FileOutputStream.java:354)
at
org.apache.spark.storage.TimeTrackingOutputStream.close(TimeTrackingOutputStream.java:72)
at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
at
net.jpountz.lz4.LZ4BlockOutputStream.close(LZ4BlockOutputStream.java:178)
at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
at
org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.close(UnsafeRowSerializer.scala:96)
at
org.apache.spark.storage.DiskBlockObjectWriter$$anonfun$close$2.apply$mcV$sp(DiskBlockObjectWriter.scala:108)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1316)
at
org.apache.spark.storage.DiskBlockObjectWriter.close(DiskBlockObjectWriter.scala:107)
at
org.apache.spark.storage.DiskBlockObjectWriter.revertPartialWritesAndClose(DiskBlockObjectWriter.scala:159)
at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.stop(BypassMergeSortShuffleWriter.java:234)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:85)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
17/11/21 04:29:37 WARN netty.OneWayOutboxMessage: Failed to send one-way
RPC.
java.io.IOException: Failed to connect to /192.168.123.43:58889
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228)
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:179)
at
org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:197)
at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:191)
at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:187)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.ConnectException: Connection refused: /
192.168.123.43:58889
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at
io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
at
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
  ... 1 more


Re: No space left on device

2017-10-17 Thread Chetan Khatri
Process data in micro batch
On 18-Oct-2017 10:36 AM, "Chetan Khatri" <chetan.opensou...@gmail.com>
wrote:

> Your hard drive don't have much space
> On 18-Oct-2017 10:35 AM, "Mina Aslani" <aslanim...@gmail.com> wrote:
>
>> Hi,
>>
>> I get "No space left on device" error in my spark worker:
>>
>> Error writing stream to file /usr/spark-2.2.0/work/app-.../0/stderr
>> java.io.IOException: No space left on device
>>
>> In my spark cluster, I have one worker and one master.
>> My program consumes stream of data from kafka and publishes the result
>> into kafka. I set my RDD = null after I finish working, so that
>> intermediate shuffle files are removed quickly.
>>
>> How can I avoid "No space left on device"?
>>
>> Best regards,
>> Mina
>>
>


Re: No space left on device

2017-10-17 Thread Chetan Khatri
Your hard drive don't have much space
On 18-Oct-2017 10:35 AM, "Mina Aslani"  wrote:

> Hi,
>
> I get "No space left on device" error in my spark worker:
>
> Error writing stream to file /usr/spark-2.2.0/work/app-.../0/stderr
> java.io.IOException: No space left on device
>
> In my spark cluster, I have one worker and one master.
> My program consumes stream of data from kafka and publishes the result
> into kafka. I set my RDD = null after I finish working, so that
> intermediate shuffle files are removed quickly.
>
> How can I avoid "No space left on device"?
>
> Best regards,
> Mina
>


Re: Spark - Partitions

2017-10-12 Thread Chetan Khatri
Use repartition
On 13-Oct-2017 9:35 AM, "KhajaAsmath Mohammed" 
wrote:

> Hi,
>
> I am reading hive query and wiriting the data back into hive after doing
> some transformations.
>
> I have changed setting spark.sql.shuffle.partitions to 2000 and since then
> job completes fast but the main problem is I am getting 2000 files for each
> partition
> size of file is 10 MB .
>
> is there a way to get same performance but write lesser number of files ?
>
> I am trying repartition now but would like to know if there are any other
> options.
>
> Thanks,
> Asmath
>


Re: Write only one output file in Spark SQL

2017-08-11 Thread Chetan Khatri
What you can do is at hive creates partitioned column for example date and
use Val finalDf = repartition(data frame.col("date-column")) and later say
insert overwrite tablename partition(date-column) select * from tempview

Would work as expected
On 11-Aug-2017 11:03 PM, "KhajaAsmath Mohammed" 
wrote:

> we had spark.sql.partitions as 4 but in hdfs it is ending up with 200
> files and 4 files are actually having data and rest of them are having zero
> bytes.
>
> My only requirement is to run fast for hive insert overwrite query from
> spark temporary table and end up having less files instead of more files
> with zero bytes.
>
> I am using spark sql query of hive insert overwite not the write method on
> dataframe as it is not supported in 1.6 version of spark for kerberos
> cluster.
>
>
> On Fri, Aug 11, 2017 at 12:23 PM, Lukas Bradley 
> wrote:
>
>> Please show the write() call, and the results in HDFS.  What are all the
>> files you see?
>>
>> On Fri, Aug 11, 2017 at 1:10 PM, KhajaAsmath Mohammed <
>> mdkhajaasm...@gmail.com> wrote:
>>
>>> tempTable = union_df.registerTempTable("tempRaw")
>>>
>>> create = hc.sql('CREATE TABLE IF NOT EXISTS blab.pyspark_dpprq (vin
>>> string, utctime timestamp, description string, descriptionuom string,
>>> providerdesc string, dt_map string, islocation string, latitude double,
>>> longitude double, speed double, value string)')
>>>
>>> insert = hc.sql('INSERT OVERWRITE TABLE blab.pyspark_dpprq SELECT * FROM
>>> tempRaw')
>>>
>>>
>>>
>>>
>>> On Fri, Aug 11, 2017 at 11:00 AM, Daniel van der Ende <
>>> daniel.vandere...@gmail.com> wrote:
>>>
 Hi Asmath,

 Could you share the code you're running?

 Daniel

 On Fri, 11 Aug 2017, 17:53 KhajaAsmath Mohammed, <
 mdkhajaasm...@gmail.com> wrote:

> Hi,
>
>
>
> I am using spark sql to write data back to hdfs and it is resulting in
> multiple output files.
>
>
>
> I tried changing number spark.sql.shuffle.partitions=1 but it
> resulted in very slow performance.
>
>
>
> Also tried coalesce and repartition still the same issue. any
> suggestions?
>
>
>
> Thanks,
>
> Asmath
>

>>>
>>
>


Re: Reparitioning Hive tables - Container killed by YARN for exceeding memory limits

2017-08-03 Thread Chetan Khatri
Thanks Holden !


On Thu, Aug 3, 2017 at 4:02 AM, Holden Karau <hol...@pigscanfly.ca> wrote:

> The memory overhead is based less on the total amount of data and more on
> what you end up doing with the data (e.g. if your doing a lot of off-heap
> processing or using Python you need to increase it). Honestly most people
> find this number for their job "experimentally" (e.g. they try a few
> different things).
>
> On Wed, Aug 2, 2017 at 1:52 PM, Chetan Khatri <chetan.opensou...@gmail.com
> > wrote:
>
>> Ryan,
>> Thank you for reply.
>>
>> For 2 TB of Data what should be the value of
>> spark.yarn.executor.memoryOverhead = ?
>>
>> with regards to this - i see issue at spark
>> https://issues.apache.org/jira/browse/SPARK-18787 , not sure whether it
>> works or not at Spark 2.0.1  !
>>
>> can you elaborate more for spark.memory.fraction setting.
>>
>> number of partitions = 674
>> Cluster: 455 GB total memory, VCores: 288, Nodes: 17
>> Given / tried memory config: executor-mem = 16g, num-executor=10,
>> executor cores=6, driver mem=4g
>>
>> spark.default.parallelism=1000
>> spark.sql.shuffle.partitions=1000
>> spark.yarn.executor.memoryOverhead=2048
>> spark.shuffle.io.preferDirectBufs=false
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Wed, Aug 2, 2017 at 10:43 PM, Ryan Blue <rb...@netflix.com> wrote:
>>
>>> Chetan,
>>>
>>> When you're writing to a partitioned table, you want to use a shuffle to
>>> avoid the situation where each task has to write to every partition. You
>>> can do that either by adding a repartition by your table's partition keys,
>>> or by adding an order by with the partition keys and then columns you
>>> normally use to filter when reading the table. I generally recommend the
>>> second approach because it handles skew and prepares the data for more
>>> efficient reads.
>>>
>>> If that doesn't help, then you should look at your memory settings. When
>>> you're getting killed by YARN, you should consider setting `
>>> spark.shuffle.io.preferDirectBufs=false` so you use less off-heap
>>> memory that the JVM doesn't account for. That is usually an easier fix than
>>> increasing the memory overhead. Also, when you set executor memory, always
>>> change spark.memory.fraction to ensure the memory you're adding is used
>>> where it is needed. If your memory fraction is the default 60%, then 60% of
>>> the memory will be used for Spark execution, not reserved whatever is
>>> consuming it and causing the OOM. (If Spark's memory is too low, you'll see
>>> other problems like spilling too much to disk.)
>>>
>>> rb
>>>
>>> On Wed, Aug 2, 2017 at 9:02 AM, Chetan Khatri <
>>> chetan.opensou...@gmail.com> wrote:
>>>
>>>> Can anyone please guide me with above issue.
>>>>
>>>>
>>>> On Wed, Aug 2, 2017 at 6:28 PM, Chetan Khatri <
>>>> chetan.opensou...@gmail.com> wrote:
>>>>
>>>>> Hello Spark Users,
>>>>>
>>>>> I have Hbase table reading and writing to Hive managed table where i
>>>>> applied partitioning by date column which worked fine but it has generate
>>>>> more number of files in almost 700 partitions but i wanted to use
>>>>> reparation to reduce File I/O by reducing number of files inside each
>>>>> partition.
>>>>>
>>>>> *But i ended up with below exception:*
>>>>>
>>>>> ExecutorLostFailure (executor 11 exited caused by one of the running
>>>>> tasks) Reason: Container killed by YARN for exceeding memory limits. 14.0
>>>>> GB of 14 GB physical memory used. Consider boosting spark.yarn.executor.
>>>>> memoryOverhead.
>>>>>
>>>>> Driver memory=4g, executor mem=12g, num-executors=8, executor core=8
>>>>>
>>>>> Do you think below setting can help me to overcome above issue:
>>>>>
>>>>> spark.default.parellism=1000
>>>>> spark.sql.shuffle.partitions=1000
>>>>>
>>>>> Because default max number of partitions are 1000.
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>>
>>> --
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
>>>
>>
>>
>
>
> --
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau
>


  1   2   >