ClassNotFoundException when submitting a job

2019-09-27 Thread 163
Hi Guys,

Flink version is 1.9.0 and built against HDP.

I got the following exceptions when submitting a job using Hadoop input to read 
sequence file in hdfs.

Thanks for your help!

Qi


 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: Could not submit 
job (JobID: 78025b8d9cfd49d2f94190fb11849033)
at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:244)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
at 
org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:77)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:280)
at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
at 
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
submit JobGraph.
at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:382)
at 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:263)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal 
server error., (JobManagerRunner.java:152)
at 
org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:83)
at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:375)
at 
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
... 7 more
Caused by: org.apache.flink.runtime.JobException: Creating the input splits 
caused an error: 
org/apache/flink/hadoop2/shaded/com/google/re2j/PatternSyntaxException
at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:270)
at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:907)
at 
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:230)
at 
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:106)
at 
org.apache.flink.runtime.scheduler.LegacyScheduler.createExecutionGraph(LegacyScheduler.java:207)
at 
org.apache.flink.runtime.scheduler.LegacyScheduler.createAndRestoreExecutionGraph(LegacyScheduler.java:184)
at 
org.apache.flink.runtime.scheduler.LegacyScheduler.(LegacyScheduler.java:176)
at 
org.apache.flink.runtime.scheduler.LegacySchedulerFactory.createInstance(LegacySchedulerFactory.java:70)
at 
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:275)
at 
org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:265)
at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMaste

Re: Flink- Heap Space running out

2019-09-27 Thread Nishant Gupta
Hi Fabian and Mike

*flink-conf.yaml  [In a 3 node cluster having 120 GB memory each and 3 TB
hard disk ]*
jobmanager.heap.size: 50120m
taskmanager.heap.size: 50120m

*With Idle state retention having below configuration  (Same heap space
issue)  *
*execution:*
  planner: old
  type: streaming
  time-characteristic: event-time
  periodic-watermarks-interval: 200
  result-mode: table
  max-table-result-rows: 100
  parallelism: 3
  max-parallelism: 128

*  min-idle-state-retention: 30  max-idle-state-retention: 60  *

*With time-windowed join (Same heap space issue)*
*SELECT K.* FROM KafkaSource AS K, BadIP AS B WHERE K.sourceip = B.bad_ip
AND B.b_proctime BETWEEN K.k_proctime - INTERVAL '20' MINUTE AND
K.k_proctime + INTERVAL '5' MINUTE*

*I have tried Temporal functions - It is working fine*

I was really wishing to make it work with idle state and time window join.
Could you please check the configuration and query.
Please let me know if any other details are required


On Thu, Sep 26, 2019 at 8:46 PM Fabian Hueske  wrote:

> Hi,
>
> I don' think that the memory configuration is the issue.
> The problem is the join query. The join does not have any temporal
> boundaries.
> Therefore, both tables are completely stored in memory and never released.
>
> You can configure a memory eviction strategy via idle state retention [1]
> but you should make sure that this is really what you want.
> Alternatively, try a time-windowed join or a join with a temporal table
> function.
>
> Best, Fabian
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/query_configuration.html#idle-state-retention-time
>
> Am Do., 26. Sept. 2019 um 17:08 Uhr schrieb miki haiat  >:
>
>> You can configure the task manager memory in the config.yaml file.
>> What is the current configuration?
>>
>> On Thu, Sep 26, 2019, 17:14 Nishant Gupta 
>> wrote:
>>
>>>  am running a query to join a stream and a table as below. It is running
>>> out of heap space. Even though it has enough heap space in flink cluster
>>> (60GB * 3)
>>>
>>> Is there an eviction strategy needed for this query ?
>>>
>>> *SELECT sourceKafka.* FROM sourceKafka INNER JOIN DefaulterTable ON
>>> sourceKafka.CC=DefaulterTable.CC;  *
>>>
>>> Thanks
>>>
>>> Nishant
>>>
>>


Flink ColumnStats

2019-09-27 Thread Flavio Pompermaier
Hi all,
I've seen that recently there was an ongoing effort about Flink ColumnStats
but I can't find a Flink job that computes Flink table stats, I found only
a code that does the conversion from Hive catalog.
Is there any Flink utility I can call to compute them on a Table?

We've tried to implement a specific job at
https://github.com/okkam-it/flink-descriptive-stats/blob/master/src/main/java/jar/ProfileJob.java
that
works on a DataSet but we didn't get any feedback about it..is there any
better way to achieve this?

Best,
Flavio


Increasing trend for state size of keyed stream using ProcessWindowFunction with ProcessingTimeSessionWindows

2019-09-27 Thread Oliwer Kostera
Hi all,


I'm using ProcessWindowFunction in a keyed stream with the following definition:

final SingleOutputStreamOperator processWindowFunctionStream =
 
keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.milliseconds(100)))
.process(new CustomProcessWindowFunction())
.uid(PROCESS_WINDOW_FUNCTION_OPERATOR_ID)
.name("Process window function");


My checkpointing configuration is set to use RocksDB state backend with 
incremental checkpointing and EXACTLY_ONCE mode.

In a runtime I noticed that even though data ingestion is static - same keys 
and frequency of messages the size of the process window operator keeps 
increasing. I tried to reproduce it with minimal similar setup here: 
https://github.com/loliver1234/flink-process-window-function and was successful 
to do so.

Testing conditions:

  *   RabbitMQ source with Exactly-once guarantee and 65k prefetch count
  *   RabbitMQ sink to collect messages
  *   Simple ProcessWindowFunction that only pass messages through
  *   Stream time characteristic set to TimeCharacteristic.ProcessingTime

Testing scenario:

  *   Start flink job and check initial state size - State Size: 127 KB
  *   Start sending messages, 1000 same unique keys every 1s (they are not 
falling into defined time window gap set to 100ms, each message should create 
new window)
  *   State of the process window operator keeps increasing - after 1mln 
messages state ended up to be around 2mb
  *   Stop sending messages and wait till rabbit queue is fully consumed and 
few checkpoints go by
  *   Was expected to see state size to decrease to base value but it stayed at 
2mb
  *   Continue to send messages with the same keys and state kept increasing 
trend.

What I checked:

  *   Registration and deregistration of timers set for time windows - each 
registration matched its deregistration
  *   Checked that in fact there are no window merges
  *   Tried custom Trigger disabling window merges and setting onProcessingTime 
trigger to TriggerResult.FIRE_AND_PURGE - same state behavior

Tested with:

  *   Local Flink Mini Cluster running from IDE
  *   Flink ha standalone cluster  run in docker

On staging environment, we noticed that state for that operator keeps 
increasing indefinitely, after some months reaching even 1,5gb for 100k unique 
keys

With best regards

Oliwer

[https://www.adbglobal.com/wp-content/uploads/adb.png]
adbglobal.com
This message (including any attachments) may contain confidential, proprietary, 
privileged and/or private information. The information is intended for the use 
of the individual or entity designated above. If you are not the intended 
recipient of this message, please notify the sender immediately, and delete the 
message and any attachments. Any disclosure, reproduction, distribution or 
other use of this message or any attachments by an individual or entity other 
than the intended recipient is STRICTLY PROHIBITED.
Please note that ADB protects your privacy. Any personal information we collect 
from you is used in accordance with our Privacy 
Policy and in compliance with 
applicable European data protection law (Regulation (EU) 2016/679, General Data 
Protection Regulation) and other statutory provisions.


Re: Flink- Heap Space running out

2019-09-27 Thread Nishant Gupta
Appoligies correction done to previous email

Hi Fabian and Mike

*flink-conf.yaml  [In a 3 node cluster having 120 GB memory each and 3 TB
hard disk ]*
jobmanager.heap.size: 50120m
taskmanager.heap.size: 50120m

*With Idle state retention having below configuration  (Same heap space
issue)  *
*execution:*
  planner: old
  type: streaming
  time-characteristic: event-time
  periodic-watermarks-interval: 200
  result-mode: table
  max-table-result-rows: 100
  parallelism: 3
  max-parallelism: 128

*  min-idle-state-retention: 30  max-idle-state-retention: 60  *

*With time-windowed join (Records gets missed out and duplicated based on
the timeinterval I push badips)*
*SELECT K.* FROM KafkaSource AS K, BadIP AS B WHERE K.sourceip = B.bad_ip
AND B.b_proctime BETWEEN K.k_proctime - INTERVAL '20' MINUTE AND
K.k_proctime + INTERVAL '5' MINUTE*

*I have tried Temporal functions - It is working fine*

I was really wishing to make it work with idle state and time window join.
Could you please check the configuration and query.
Please let me know if any other details are required

On Fri, Sep 27, 2019 at 12:41 PM Nishant Gupta 
wrote:

>
> Hi Fabian and Mike
>
> *flink-conf.yaml  [In a 3 node cluster having 120 GB memory each and 3 TB
> hard disk ]*
> jobmanager.heap.size: 50120m
> taskmanager.heap.size: 50120m
>
> *With Idle state retention having below configuration  (Same heap space
> issue)  *
> *execution:*
>   planner: old
>   type: streaming
>   time-characteristic: event-time
>   periodic-watermarks-interval: 200
>   result-mode: table
>   max-table-result-rows: 100
>   parallelism: 3
>   max-parallelism: 128
>
> *  min-idle-state-retention: 30  max-idle-state-retention: 60  *
>
> *With time-windowed join (Same heap space issue)*
> *SELECT K.* FROM KafkaSource AS K, BadIP AS B WHERE K.sourceip = B.bad_ip
> AND B.b_proctime BETWEEN K.k_proctime - INTERVAL '20' MINUTE AND
> K.k_proctime + INTERVAL '5' MINUTE*
>
> *I have tried Temporal functions - It is working fine*
>
> I was really wishing to make it work with idle state and time window join.
> Could you please check the configuration and query.
> Please let me know if any other details are required
>
>
> On Thu, Sep 26, 2019 at 8:46 PM Fabian Hueske  wrote:
>
>> Hi,
>>
>> I don' think that the memory configuration is the issue.
>> The problem is the join query. The join does not have any temporal
>> boundaries.
>> Therefore, both tables are completely stored in memory and never released.
>>
>> You can configure a memory eviction strategy via idle state retention [1]
>> but you should make sure that this is really what you want.
>> Alternatively, try a time-windowed join or a join with a temporal table
>> function.
>>
>> Best, Fabian
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/query_configuration.html#idle-state-retention-time
>>
>> Am Do., 26. Sept. 2019 um 17:08 Uhr schrieb miki haiat <
>> miko5...@gmail.com>:
>>
>>> You can configure the task manager memory in the config.yaml file.
>>> What is the current configuration?
>>>
>>> On Thu, Sep 26, 2019, 17:14 Nishant Gupta 
>>> wrote:
>>>
  am running a query to join a stream and a table as below. It is
 running out of heap space. Even though it has enough heap space in flink
 cluster (60GB * 3)

 Is there an eviction strategy needed for this query ?

 *SELECT sourceKafka.* FROM sourceKafka INNER JOIN DefaulterTable ON
 sourceKafka.CC=DefaulterTable.CC;  *

 Thanks

 Nishant

>>>


Best way to link static data to event data?

2019-09-27 Thread John Smith
Using 1.8

I have a list of phone area codes, cities and their geo location in CSV
file. And my events from Kafka contain phone numbers.

I want to parse the phone number get it's area code and then associate the
phone number to a city, geo location and as well count how many numbers are
in that city/geo location.


Re: Best way to link static data to event data?

2019-09-27 Thread Oytun Tez
Hi,

You should look broadcast state pattern in Flink docs.

---
Oytun Tez

*M O T A W O R D*
The World's Fastest Human Translation Platform.
oy...@motaword.com — www.motaword.com


On Fri, Sep 27, 2019 at 2:42 PM John Smith  wrote:

> Using 1.8
>
> I have a list of phone area codes, cities and their geo location in CSV
> file. And my events from Kafka contain phone numbers.
>
> I want to parse the phone number get it's area code and then associate the
> phone number to a city, geo location and as well count how many numbers are
> in that city/geo location.
>


Re: Best way to link static data to event data?

2019-09-27 Thread John Smith
I don't think I need state for this...

I need to load a CSV. I'm guessing as a table and then filter my events
parse the number, transform the event into geolocation data and sink that
downstream data source.

So I'm guessing i need a CSV source and my Kafka source and somehow join
those transform the event...

On Fri, 27 Sep 2019 at 14:43, Oytun Tez  wrote:

> Hi,
>
> You should look broadcast state pattern in Flink docs.
>
> ---
> Oytun Tez
>
> *M O T A W O R D*
> The World's Fastest Human Translation Platform.
> oy...@motaword.com — www.motaword.com
>
>
> On Fri, Sep 27, 2019 at 2:42 PM John Smith  wrote:
>
>> Using 1.8
>>
>> I have a list of phone area codes, cities and their geo location in CSV
>> file. And my events from Kafka contain phone numbers.
>>
>> I want to parse the phone number get it's area code and then associate
>> the phone number to a city, geo location and as well count how many numbers
>> are in that city/geo location.
>>
>


Re: Best way to link static data to event data?

2019-09-27 Thread Sameer W
Connected Streams is one option. But may be an overkill in your scenario if
your CSV does not refresh. If your CSV is small enough (number of records
wise), you could parse it and load it into an object (serializable) and
pass it to the constructor of the operator where you will be streaming the
data.

If the CSV can be made available via a shared network folder (or S3 in case
of AWS) you could also read it in the open function (if you use Rich
versions of the operator).

The real problem I guess is how frequently does the CSV update. If you want
the updates to propagate in near real time (or on schedule) the option 1  (
parse in driver and send it via constructor does not work). Also in the
second option you need to be responsible for refreshing the file read from
the shared folder.

In that case use Connected Streams where the stream reading in the file
(the other stream reads the events) periodically re-reads the file and
sends it down the stream. The refresh interval is your tolerance of stale
data in the CSV.

On Fri, Sep 27, 2019 at 3:49 PM John Smith  wrote:

> I don't think I need state for this...
>
> I need to load a CSV. I'm guessing as a table and then filter my events
> parse the number, transform the event into geolocation data and sink that
> downstream data source.
>
> So I'm guessing i need a CSV source and my Kafka source and somehow join
> those transform the event...
>
> On Fri, 27 Sep 2019 at 14:43, Oytun Tez  wrote:
>
>> Hi,
>>
>> You should look broadcast state pattern in Flink docs.
>>
>> ---
>> Oytun Tez
>>
>> *M O T A W O R D*
>> The World's Fastest Human Translation Platform.
>> oy...@motaword.com — www.motaword.com
>>
>>
>> On Fri, Sep 27, 2019 at 2:42 PM John Smith 
>> wrote:
>>
>>> Using 1.8
>>>
>>> I have a list of phone area codes, cities and their geo location in CSV
>>> file. And my events from Kafka contain phone numbers.
>>>
>>> I want to parse the phone number get it's area code and then associate
>>> the phone number to a city, geo location and as well count how many numbers
>>> are in that city/geo location.
>>>
>>


Re: Best way to link static data to event data?

2019-09-27 Thread John Smith
It's a fairly small static file that may update once in a blue moon lol But
I'm hopping to use existing functions. Why can't I just use CSV to table
source?

Why should I have to now either write my own CSV parser or look for 3rd
party, then what put in a Java Map and lookup that map? I'm finding Flink
to be a bit of death by 1000 paper cuts lol

if i put the CSV in a table I can then use it to join across it with the
event no?

On Fri, 27 Sep 2019 at 16:25, Sameer W  wrote:

> Connected Streams is one option. But may be an overkill in your scenario
> if your CSV does not refresh. If your CSV is small enough (number of
> records wise), you could parse it and load it into an object (serializable)
> and pass it to the constructor of the operator where you will be streaming
> the data.
>
> If the CSV can be made available via a shared network folder (or S3 in
> case of AWS) you could also read it in the open function (if you use Rich
> versions of the operator).
>
> The real problem I guess is how frequently does the CSV update. If you
> want the updates to propagate in near real time (or on schedule) the option
> 1  ( parse in driver and send it via constructor does not work). Also in
> the second option you need to be responsible for refreshing the file read
> from the shared folder.
>
> In that case use Connected Streams where the stream reading in the file
> (the other stream reads the events) periodically re-reads the file and
> sends it down the stream. The refresh interval is your tolerance of stale
> data in the CSV.
>
> On Fri, Sep 27, 2019 at 3:49 PM John Smith  wrote:
>
>> I don't think I need state for this...
>>
>> I need to load a CSV. I'm guessing as a table and then filter my events
>> parse the number, transform the event into geolocation data and sink that
>> downstream data source.
>>
>> So I'm guessing i need a CSV source and my Kafka source and somehow join
>> those transform the event...
>>
>> On Fri, 27 Sep 2019 at 14:43, Oytun Tez  wrote:
>>
>>> Hi,
>>>
>>> You should look broadcast state pattern in Flink docs.
>>>
>>> ---
>>> Oytun Tez
>>>
>>> *M O T A W O R D*
>>> The World's Fastest Human Translation Platform.
>>> oy...@motaword.com — www.motaword.com
>>>
>>>
>>> On Fri, Sep 27, 2019 at 2:42 PM John Smith 
>>> wrote:
>>>
 Using 1.8

 I have a list of phone area codes, cities and their geo location in CSV
 file. And my events from Kafka contain phone numbers.

 I want to parse the phone number get it's area code and then associate
 the phone number to a city, geo location and as well count how many numbers
 are in that city/geo location.

>>>


Re: Best way to link static data to event data?

2019-09-27 Thread Sameer Wadkar
The main consideration in these type of scenarios is not the type of source 
function you use. The key point is how does the event operator get the slow 
moving master data and cache it. And then recover it if it fails and restarts 
again. 

It does not matter that the csv file does not change often. It is possible that 
the event operator may fail and restart. The csv data needs to made available 
to it again. 

In that scenario the initial suggestion I made to pass the csv data in the 
constructor is not adequate by itself. You need to store it in the operator 
state which allows it to recover it when it restarts  on failure.

As long as the above takes place you have resiliency and you can use any 
suitable method or source. I have not used Table source as much but connected 
streams and operator state has worked out for me in similar scenarios. 

Sameer

Sent from my iPhone

> On Sep 27, 2019, at 4:38 PM, John Smith  wrote:
> 
> It's a fairly small static file that may update once in a blue moon lol But 
> I'm hopping to use existing functions. Why can't I just use CSV to table 
> source?
> 
> Why should I have to now either write my own CSV parser or look for 3rd 
> party, then what put in a Java Map and lookup that map? I'm finding Flink to 
> be a bit of death by 1000 paper cuts lol
> 
> if i put the CSV in a table I can then use it to join across it with the 
> event no?
> 
>> On Fri, 27 Sep 2019 at 16:25, Sameer W  wrote:
>> Connected Streams is one option. But may be an overkill in your scenario if 
>> your CSV does not refresh. If your CSV is small enough (number of records 
>> wise), you could parse it and load it into an object (serializable) and pass 
>> it to the constructor of the operator where you will be streaming the data. 
>> 
>> If the CSV can be made available via a shared network folder (or S3 in case 
>> of AWS) you could also read it in the open function (if you use Rich 
>> versions of the operator).
>> 
>> The real problem I guess is how frequently does the CSV update. If you want 
>> the updates to propagate in near real time (or on schedule) the option 1  ( 
>> parse in driver and send it via constructor does not work). Also in the 
>> second option you need to be responsible for refreshing the file read from 
>> the shared folder.
>> 
>> In that case use Connected Streams where the stream reading in the file (the 
>> other stream reads the events) periodically re-reads the file and sends it 
>> down the stream. The refresh interval is your tolerance of stale data in the 
>> CSV.
>> 
>>> On Fri, Sep 27, 2019 at 3:49 PM John Smith  wrote:
>>> I don't think I need state for this...
>>> 
>>> I need to load a CSV. I'm guessing as a table and then filter my events 
>>> parse the number, transform the event into geolocation data and sink that 
>>> downstream data source.
>>> 
>>> So I'm guessing i need a CSV source and my Kafka source and somehow join 
>>> those transform the event...
>>> 
 On Fri, 27 Sep 2019 at 14:43, Oytun Tez  wrote:
 Hi,
 
 You should look broadcast state pattern in Flink docs.
 
 ---
 Oytun Tez
 
 M O T A W O R D
 The World's Fastest Human Translation Platform.
 oy...@motaword.com — www.motaword.com
 
 
> On Fri, Sep 27, 2019 at 2:42 PM John Smith  wrote:
> Using 1.8
> 
> I have a list of phone area codes, cities and their geo location in CSV 
> file. And my events from Kafka contain phone numbers.
> 
> I want to parse the phone number get it's area code and then associate 
> the phone number to a city, geo location and as well count how many 
> numbers are in that city/geo location.