Understanding hive query plan for Join operation

2016-09-17 Thread Nitin Kumar
Hi,

I have the a query and its associated query and query
<https://gist.github.com/NitinKumar94/0553b401a8842d5b2f688d6db3e2d23b> for
simulated data

The number of rows in the table lte_data_tenmillion is 1000
The number of rows in the table subscriber data is 10

*For both tables none of the rows have a null value in the subscriber_id
column. *

I'm finding it difficult to understand, why the query plan displays the
number of rows scanned (after applying predicate: subscriber_id is not null
(type: boolean)) to be exactly half the value of original number of rows.

Similar is the case with the other filer operator.

Also, the total number of rows of the resulting data, as mentioned under
"File Output Operator [FS_20]" is 550. However the actual number of
rows in the resulting table is 2499723.

I might be wrongly interpreting the query plan . I would highly appreciate
it if someone could clear the inconsistencies I observe in the query plan
and the actual result.

Thanks and regards,
Nitin Kumar


Re: Populating tables using hive and spark

2016-08-22 Thread Nitin Kumar
Hi Mich!

There is no problem is displaying records or performing any aggregations on
the records after inserting data from spark into the hive table. It is the
count query (in hive) that returns the wrong result in hive prior to
issuing the compute statistics command.

On Mon, Aug 22, 2016 at 4:50 PM, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> Ok This is my test
>
> 1) create table in Hive and populate it with two rows
>
> hive> create table testme (col1 int, col2 string);
> OK
> hive> insert into testme values (1,'London');
> Query ID = hduser_20160821212812_2a8384af-23f1-4f28-9395-a99a5f4c1a4a
> OK
> hive> insert into testme values (2,'NY');
> Query ID = hduser_20160821212812_2a8384af-23f1-4f28-9395-a99a5f4c1a4a
> OK
> hive> select * from testme;
> OK
> 1   London
> 2   NY
>
> So the rows are there
>
> Now use  Spark to create two more rows
>
> scala> case class columns (col1: Int, col2: String)
> defined class columns
> scala> val df =sc.parallelize(Array((3,"California"),(4,"Dehli"))).map(p
> => columns(p._1.toString.toInt, p._2.toString)).toDF()
> df: org.apache.spark.sql.DataFrame = [col1: int, col2: string]
> scala> df.show
> ++--+
> |col1|  col2|
> ++--+
> |   3|California|
> |   4| Dehli|
> ++--+
>
> // register it as tempTable
> scala> df.registerTempTable("tmp")
> scala> sql("insert into test.testme select * from tmp")
> res9: org.apache.spark.sql.DataFrame = []
> scala> sql("select * from testme").show
> ++--+
> |col1|  col2|
> ++--+
> |   1|London|
> |   2|NY|
> |   3|California|
> |   4| Dehli|
> ++--+
> So the rows are there.
>
> Let me go to Hive again now
>
> Try and hit a count query in hive on testme here... it seems to return a
value of 2

>
> hive>  select * from testme;
> OK
> 1   London
> 2   NY
> 3   California
> 4   Dehli
>
> hive> analyze table testme compute statistics for columns;
>
> So is there any issue here?
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 22 August 2016 at 11:51, Nitin Kumar <nk94.nitinku...@gmail.com> wrote:
>
>> Hi Furcy,
>>
>> If I execute the command "ANALYZE TABLE TEST_ORC COMPUTE STATISTICS"
>> before checking the count from hive, Hive returns the correct count albeit
>> it does not spawn a map-reduce job for computing the count.
>>
>> I'm running a HDP 2.4 Cluster with Hive 1.2.1.2.4 and Spark 1.6.1
>>
>> If others can concur we can go ahead and report it as a bug.
>>
>> Regards,
>> Nitin
>>
>>
>>
>> On Mon, Aug 22, 2016 at 4:15 PM, Furcy Pin <furcy@flaminem.com>
>> wrote:
>>
>>> Hi Nitin,
>>>
>>> I confirm that there is something odd here.
>>>
>>> I did the following test :
>>>
>>> create table test_orc (id int, name string, dept string) stored as ORC;
>>> insert into table test_orc values (1, 'abc', 'xyz');
>>> insert into table test_orc values (2, 'def', 'xyz');
>>> insert into table test_orc values (3, 'pqr', 'xyz');
>>> insert into table test_orc values (4, 'ghi', 'xyz');
>>>
>>>
>>> I ended up with 4 files on hdfs:
>>>
>>> 00_0
>>> 00_0_copy_1
>>> 00_0_copy_2
>>> 00_0_copy_3
>>>
>>>
>>> Then I renamed 00_0_copy_2 to part-0, and I still got COUNT(*) =
>>> 4 with hive.
>>> So this is not a file name issue.
>>>
>>> I then removed one of the files, and I got this :
>>>
>>> > SELECT COUNT(1) FROM test_orc ;
>>> +--+--+
>>> | _c0  |
>>> +--+--+
>>> | 4|
>>> +--+--+
>>>
>>> > SELECT * FROM test_orc ;
>>> +--+++--+
>>> | test_orc.id  | 

Re: Populating tables using hive and spark

2016-08-22 Thread Nitin Kumar
Hi Furcy,

If I execute the command "ANALYZE TABLE TEST_ORC COMPUTE STATISTICS" before
checking the count from hive, Hive returns the correct count albeit it does
not spawn a map-reduce job for computing the count.

I'm running a HDP 2.4 Cluster with Hive 1.2.1.2.4 and Spark 1.6.1

If others can concur we can go ahead and report it as a bug.

Regards,
Nitin



On Mon, Aug 22, 2016 at 4:15 PM, Furcy Pin <furcy@flaminem.com> wrote:

> Hi Nitin,
>
> I confirm that there is something odd here.
>
> I did the following test :
>
> create table test_orc (id int, name string, dept string) stored as ORC;
> insert into table test_orc values (1, 'abc', 'xyz');
> insert into table test_orc values (2, 'def', 'xyz');
> insert into table test_orc values (3, 'pqr', 'xyz');
> insert into table test_orc values (4, 'ghi', 'xyz');
>
>
> I ended up with 4 files on hdfs:
>
> 00_0
> 00_0_copy_1
> 00_0_copy_2
> 00_0_copy_3
>
>
> Then I renamed 00_0_copy_2 to part-0, and I still got COUNT(*) = 4
> with hive.
> So this is not a file name issue.
>
> I then removed one of the files, and I got this :
>
> > SELECT COUNT(1) FROM test_orc ;
> +--+--+
> | _c0  |
> +--+--+
> | 4|
> +--+--+
>
> > SELECT * FROM test_orc ;
> +--+++--+
> | test_orc.id  | test_orc.name  | test_orc.dept  |
> +--+++--+
> | 1| abc| xyz|
> | 2| def| xyz|
> | 4| ghi| xyz|
> +--+++--+
> 3 rows selected (0.162 seconds)
>
> So, my guess is that when Hive inserts data, it must keep somewhere in the
> metastore the number of rows in the table.
> However, if the files are modified by someone else than Hive itself,
> (either manually or with Spark), you end up with an inconsistency.
>
> So I guess we can call it a bug:
>
> Hive should detect that the files changed and invalidate its
> pre-calculated count.
> Optionally, Spark should be nice with Hive and update the the count when
> inserting.
>
> I don't know if this bug has already been reported, and I tested on Hive
> 1.1.0, so perhaps it is already solved in later releases.
>
> Regards,
>
> Furcy
>
>
> On Mon, Aug 22, 2016 at 9:34 AM, Nitin Kumar <nk94.nitinku...@gmail.com>
> wrote:
>
>> Hi!
>>
>> I've noticed that hive has problems in registering new data records if
>> the same table is written to using both the hive terminal and spark sql.
>> The problem is demonstrated through the commands listed below
>>
>> 
>> hive> use default;
>> hive> create table test_orc (id int, name string, dept string) stored as
>> ORC;
>> hive> insert into table test_orc values (1, 'abc', 'xyz');
>> hive> insert into table test_orc values (2, 'def', 'xyz');
>> hive> select count(*) from test_orc;
>> OK
>> 2
>> hive> select distinct(name) from test_orc;
>> OK
>> abc
>> def
>>
>> *** files in hdfs path in warehouse for the created table ***
>>
>>
>> ​
>>
>> >>> data_points = [(3, 'pqr', 'xyz'), (4, 'ghi', 'xyz')]
>> >>> column_names = ['identity_id', 'emp_name', 'dept_name']
>> >>> data_df = sqlContext.createDataFrame(data_points, column_names)
>> >>> data_df.show()
>>
>> +---++-+
>> |identity_id|emp_name|dept_name|
>> +---++-+
>> |  3| pqr|  xyz|
>> |  4| ghi|  xyz|
>> +---++-+
>>
>> >>> data_df.registerTempTable('temp_table')
>> >>> sqlContext.sql('insert into table default.test_orc select * from
>> temp_table')
>>
>> *** files in hdfs path in warehouse for the created table ***
>>
>> ​
>> hive> select count(*) from test_orc; (Does not launch map-reduce job)
>> OK
>> 2
>> hive> select distinct(name) from test_orc; (Launches map-reduce job)
>> abc
>> def
>> ghi
>> pqr
>>
>> hive> create table test_orc_new like test_orc stored as ORC;
>> hive> insert into table test_orc_new select * from test_orc;
>> hive> select count(*) from test_orc_new;
>> OK
>> 4
>> ==
>>
>> Even if I restart the hive services I cannot get the proper count output
>> from hive. This problem only occurs if the table is written to using both
>> hive and spark. If only spark is used to insert records into the table
>> multiple times, the count query in the hive terminal works perfectly fine.
>>
>> This problem occurs for tables stored with different storage formats as
>> well (textFile etc.)
>>
>> Is this because of the different naming conventions used by hive and
>> spark to write records to hdfs? Or maybe it is not a recommended practice
>> to write tables using different services?
>>
>> Your thoughts and comments on this matter would be highly appreciated!
>>
>> Thanks!
>> Nitin
>>
>>
>>
>


Populating tables using hive and spark

2016-08-22 Thread Nitin Kumar
Hi!

I've noticed that hive has problems in registering new data records if the
same table is written to using both the hive terminal and spark sql. The
problem is demonstrated through the commands listed below


hive> use default;
hive> create table test_orc (id int, name string, dept string) stored as
ORC;
hive> insert into table test_orc values (1, 'abc', 'xyz');
hive> insert into table test_orc values (2, 'def', 'xyz');
hive> select count(*) from test_orc;
OK
2
hive> select distinct(name) from test_orc;
OK
abc
def

*** files in hdfs path in warehouse for the created table ***


​

>>> data_points = [(3, 'pqr', 'xyz'), (4, 'ghi', 'xyz')]
>>> column_names = ['identity_id', 'emp_name', 'dept_name']
>>> data_df = sqlContext.createDataFrame(data_points, column_names)
>>> data_df.show()

+---++-+
|identity_id|emp_name|dept_name|
+---++-+
|  3| pqr|  xyz|
|  4| ghi|  xyz|
+---++-+

>>> data_df.registerTempTable('temp_table')
>>> sqlContext.sql('insert into table default.test_orc select * from
temp_table')

*** files in hdfs path in warehouse for the created table ***

​
hive> select count(*) from test_orc; (Does not launch map-reduce job)
OK
2
hive> select distinct(name) from test_orc; (Launches map-reduce job)
abc
def
ghi
pqr

hive> create table test_orc_new like test_orc stored as ORC;
hive> insert into table test_orc_new select * from test_orc;
hive> select count(*) from test_orc_new;
OK
4
==

Even if I restart the hive services I cannot get the proper count output
from hive. This problem only occurs if the table is written to using both
hive and spark. If only spark is used to insert records into the table
multiple times, the count query in the hive terminal works perfectly fine.

This problem occurs for tables stored with different storage formats as
well (textFile etc.)

Is this because of the different naming conventions used by hive and spark
to write records to hdfs? Or maybe it is not a recommended practice to
write tables using different services?

Your thoughts and comments on this matter would be highly appreciated!

Thanks!
Nitin


Varying vcores/ram for hive queries running Tez engine

2016-04-25 Thread Nitin Kumar
I was trying to benchmark some hive queries. I am using the tez execution
engine. I varied the values of the following properties:

   1.

   hive.tez.container.size
   2.

   tez.task.resource.memory.mb
   3.

   tez.task.resource.cpu.vcores

Changes in values for property 1 is reflected properly. However it seems
that hive does not respect changes in values of property 3; it always
allocates one vcore per requested container (RM is configured to use the
DominantResourceCalculator). This got me thinking about the precedence of
property values in hive and tez.

I have the following questions with respect to these configurations

   1.

   Does hive respect the set values for the properties 2 and 3 at all?
   2.

   If I set property 1 to a value say 2048 MB and property 2 is set to a
   value of say 1024 MB does this mean that I am wasting about a GB of memory
   for each spawned container?
   3.

   Is there a property in hive similar to property 1 that allows me to use
   the 'set' command in the .hql file to specify the number of vcores to use
   per container?
   4.

   Changes in value for the property tez.am.resource.cpu.vcores are
   reflected at runtime. However I do not observe the same behaviour with
   property 3. Are there other configurations that take precedence over it?

Your inputs and suggestions would be highly appreciated.

Thanks!


PS: Tests conducted on a 5 node cluster running HDP 2.3.0


Managing input split sizes in Hive running the tez engine

2016-04-20 Thread Nitin Kumar
Hi,

I want to gain a better understanding of how in the input splits are
calculated in the tez engine.

I am aware that the *hive.input.format* property can be set to either
*HiveInputFormat* (default) or to *CombineHiveInputFormat* (generally
accepted for large number of files having sizes << hdfs block size).

I was hoping someone could walk me through the differences on how
*HiveInputFormat* and *CombineHiveInputFormat* calculate split sizes as
data file sizes vary from small (lesser than a block) to large (spanning
multiple blocks).

I want to dictate the number of mapper tasks that are spawned for scanning
a table. For the MR engine this can be controlled by setting the
*mapred.min.split.size* and *mapred.max.split.size* properties. I need to
know if there are similar configurations for the tez engine.

Also the properties *tez.grouping.max-size*, *tez.**grouping.**min-size*
and *tez.grouping.split-waves *have been set to the values of 1GB, 16MB and
1.7 respectively. However I observed that the created input splits do not
adhere to these properties.

I had two files of size 3MB each for a table. According to the set
properties, only 1 mapper task should have spawned but 2 mapper tasks
spawned instead.

Are there other properties in hive/tez that need to be set to enable input
split grouping?

I would highly appreciate your inputs.

Thanks and regards,
Nitin