This question is mostly a followup based on my earlier mail (below).
I’m re-consuming this data, one (5GB) csv file at a time.
I see that in consuming this file, there was one failed reduce task. In the
output, I see a stack trace that I’m guessing is related.
So, 2 questions:
1 – does this mean that data from the CSV file failed to be saved to HBase? Or
is the mapreduce job smart enough to re-try the failed reducer? The reason I
ask is that in the “Job History” GUI, I see that there were 24 Total Reducers,
24 Successful Reducers, and 1 Failed Reducer. So if I 1 failed but ALL
completed successfully, does that mean that the failed reducer was restarted
and finished?
2 – If this represents missing data, then is that a bug?
15/06/23 10:48:42 INFO mapreduce.Job: Job job_1433177972202_5025 completed
successfully
15/06/23 10:48:42 INFO mapreduce.Job: Counters: 52
File System Counters
FILE: Number of bytes read=119032798843
FILE: Number of bytes written=178387531471
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=5243131310
HDFS: Number of bytes written=702177539
HDFS: Number of read operations=315
HDFS: Number of large read operations=0
HDFS: Number of write operations=72
Job Counters
Failed reduce tasks=1
Launched map tasks=39
Launched reduce tasks=25
Data-local map tasks=33
Rack-local map tasks=6
Total time spent by all maps in occupied slots (ms)=14443554
Total time spent by all reduces in occupied slots (ms)=11439806
Total time spent by all map tasks (ms)=14443554
Total time spent by all reduce tasks (ms)=11439806
Total vcore-seconds taken by all map tasks=14443554
Total vcore-seconds taken by all reduce tasks=11439806
Total megabyte-seconds taken by all map tasks=103531395072
Total megabyte-seconds taken by all reduce tasks=82000529408
Map-Reduce Framework
Map input records=56330988
Map output records=563309880
Map output bytes=58314387712
Map output materialized bytes=59441013088
Input split bytes=5694
Combine input records=0
Combine output records=0
Reduce input groups=56031172
Reduce shuffle bytes=59441013088
Reduce input records=563309880
Reduce output records=560311720
Spilled Records=1689929640
Shuffled Maps =936
Failed Shuffles=0
Merged Map outputs=936
GC time elapsed (ms)=248681
CPU time spent (ms)=9564070
Physical memory (bytes) snapshot=144845623296
Virtual memory (bytes) snapshot=399930961920
Total committed heap usage (bytes)=220049965056
Phoenix MapReduce Import
Upserts Done=56330988
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=5243095430
File Output Format Counters
Bytes Written=702177539
From: Riesland, Zack
Sent: Tuesday, June 23, 2015 9:20 AM
To: '[email protected]'
Subject: RE: How To Count Rows In Large Phoenix Table?
Anil: Thanks for the tip about mapreduce.RowCounter. That takes about 70
minutes, but it works!
Unfortunately, I only got about 60% of the rows I’m expecting.
Gabriel (and anyone interested):
Thanks for your response!
A few details to give context to my question:
Our cluster has 6 region servers (256 GB RAM and 8 large drives each).
Our table has about 8 billion rows, with about a dozen columns each. This is
mostly time-series data with some details about each value.
For any given ‘node’, which is identified by the key, there are about 2,000 –
3,000 rows.
These rows each have a different timestamp in one of the columns.
The key + timestamp is the primary key.
I created the table using Phoenix, so I don’t know what it does under the
covers as far as column families.
My script does something like this:
CREATE TABLE xxxxxx (
AAAA varchar not null,
xxxx integer,
xxxxxx varchar,
xxxxxx varchar,
xxxxxx integer,
BBBB integer not null, --unique-to-the-node timestamp of the value
xxxxxx integer,
xxxxxxx integer,
xxxxxxx varchar,
xxxxxxxx decimal(19,6),
xxxxxxxx decimal(19,6)
CONSTRAINT pk_redacted PRIMARY KEY (AAAA,BBBB)
)
COMPRESSION='GZ'
SPLIT ON ('AZ', 'BZ', 'CZ', 'DZ', 'EZ', 'FZ', 'GZ', 'HZ', 'IZ', 'JZ', 'KZ',
'LZ', 'MZ', 'NZ', 'OZ', 'PZ', 'RZ', 'SZ', 'TZ', 'UZ', 'VZ', 'WZ');
As you can see, I split the table alphabetically: 1 region per letter of the
alphabet. Since our keys are based on customer ID (they start with customer Id
which is 4 letters), there are a couple letters that have no indexes, so we end
up with 4 regions per region server (24 total regions).
When I drill into the table from Ambari, I see that it is fairly
well-distributed. Most regions have 1 million-4 million requests. A few have
hundreds of thousands.
Queries against the table are very fast. Basically instant.
When I try to consume all the data at once via CsvBulkLoad, it runs for several
hours. What eventually happens is that more and more map jobs fail (and retry)
as more and more regions are busy compacting. This eventually hits a certain
threshold where the application manager decides to fail the whole job.
For my select count(*) query that fails, I believe it is a timeout issue:
java.lang.RuntimeException: org.apache.phoenix.exception.PhoenixIOException:
org.apache.phoenix.exception.PhoenixIOException: Failed after attempts=36,
exceptions:
Tue Jun 23 07:53:36 EDT 2015, null, java.net.SocketTimeoutException:
callTimeout=60000, callDuration=108925: row '' on table 'redacted' at
region=redacted,,1434377989918.552c1ed6d6d0c65ec30f467ed11ae0c3.,
hostname=redacted,60020,1434375519767, seqNum=2
at sqlline.SqlLine$IncrementalRows.hasNext(SqlLine.java:2440)
at sqlline.SqlLine$TableOutputFormat.print(SqlLine.java:2074)
at sqlline.SqlLine.print(SqlLine.java:1735)
at sqlline.SqlLine$Commands.execute(SqlLine.java:3683)
at sqlline.SqlLine$Commands.sql(SqlLine.java:3584)
at sqlline.SqlLine.dispatch(SqlLine.java:821)
at sqlline.SqlLine.begin(SqlLine.java:699)
at sqlline.SqlLine.mainWithInputRedirection(SqlLine.java:441)
at sqlline.SqlLine.main(SqlLine.java:424)
I am running the query from a region server node by CD’ing into
/user/hdp/2.2.0.0-2041/phoenix/bin and calling ./sqlline.py <params>
I created /usr/hdp/2.2.0.0-2041/phoenix/bin/hbase-site.xml and added the
configuration below, but it doesn’t seem to ‘stick’:
<configuration>
<property>
<name>phoenix.query.timeoutMs</name>
<value>900000</value>
</property>
</configuration>
I understand your comments about determining whether there are any failed map
or reduce operations. I watched each one in the application master GUI and
didn’t notice any that failed.
Finally, I understand your point about how the HBase data must have a unique
key. I confirmed that the source Hive table is also de-duplicated.
Thanks for any insight or hints you might have.
I’d love to be able to ingest the entire data set over night.
It’s clear that I’m missing quite a bit of data and I’m going to have to start
over with this table…
From: Gabriel Reid [mailto:[email protected]]
Sent: Tuesday, June 23, 2015 2:57 AM
To: [email protected]<mailto:[email protected]>
Subject: Re: How To Count Rows In Large Phoenix Table?
Hi Zack,
Would it be possible to provide a few more details on what kinds of failures
that you're getting, both with the CsvBulkLoadTool, and with the "SELECT
COUNT(*)" query?
About question #1, there aren't any known bugs (that I'm aware of) that would
cause some records to go missing in the CsvBulkLoadTool. One thing to keep in
mind is that failure to parse an input record won't cause the CsvBulkLoadTool
to crash, but it will be recorded in the job counters. There are three job
counters that are recorded: input records, failed records, and output records.
If the "failed records" job counter is present (i.e. not zero), then that means
that some records that were present in the input files were not imported.
About the failures that you're getting in the CsvBulkLoadTool, loading 0.5 TB
of data (or basically any amount of data) should just work. Could you give some
details on:
* how many records you're working with
* how many regions the output table has
* a general idea of the schema of the output table (how many columns are
involved, how many column families are involved)
* what the specific errors are that you're getting when the import job fails
One general issue to keep in mind that can cause a difference in the number of
records in Hive and in Phoenix is that your Phoenix table will have a primary
key which is guaranteed unique, and this will not be the case in Hive. This can
mean that there are multiple records in Hive that have the same values in the
primary key columns as defined in Phoenix, but when bringing these records over
to Phoenix they will end up as a single row. Any idea if this could be the
situation in your setup?
- Gabriel
On Tue, Jun 23, 2015 at 6:11 AM anil gupta
<[email protected]<mailto:[email protected]>> wrote:
For#2: You can use Row_Counter mapreduce job of HBase to count rows of large
table. You dont need to write any code.
Here is the sample command to invoke:
hbase org.apache.hadoop.hbase.mapreduce.RowCounter <TABLE_NAME>
~Anil
On Mon, Jun 22, 2015 at 12:08 PM, Ciureanu Constantin
<[email protected]<mailto:[email protected]>> wrote:
Hive can connect to HBase and insert directly into any direction.
Don't know if it also works via Phoenix...
Counting is too slow on a single threaded job /command line - you should write
a map-reduce job, with some filter to load just the key this being really fast.
A Map-reduce job is also the solution to load data from hive to HBase (read
from HDFS not Hive, prepare output to Phoenix format and bulk load the results).
Pe 22 iun. 2015 9:34 p.m., "Riesland, Zack"
<[email protected]<mailto:[email protected]>> a scris:
I had a very large Hive table that I needed in HBase.
After asking around, I came to the conclusion that my best bet was to:
1 – export the hive table to a CSV ‘file’/folder on the HDFS
2 – Use the org.apache.phoenix.mapreduce.CsvBulkLoadTool to import the data.
I found that if I tried to pass the entire folder (~ 1/2 TB of data) to the
CsvBulkLoadTool, my job would eventually fail.
Empirically, it seems that on our particular cluster, 20-30GB of data is the
most that the CSVBulkLoadTool can handle at one time without so many map jobs
timing out that the entire operation fails.
So I passed one sub-file at a time and eventually got all the data into HBase.
I tried doing a select count(*) on the table to see whether all of the rows
were transferred, but this eventually fails.
Today, I believe I found a set of data that is in Hive but NOT in HBase.
So, I have 2 questions:
1) Are there any known errors with the CsvBulkLoadTool such that it might skip
some data without getting my attention with some kind of error?
2) Is there a straightforward way to count the rows in my Phoenix table so that
I can compare the Hive table with the HBase table?
Thanks in advance!
--
Thanks & Regards,
Anil Gupta