Re: Suggestion on Join Approach with Spark

2019-05-15 Thread Chetan Khatri
Hello Nicholas,

I sincerely apologise.

Thanks

On Wed, May 15, 2019 at 11:34 PM Nicholas Chammas <
nicholas.cham...@gmail.com> wrote:

> This kind of question is for the User list, or for something like Stack
> Overflow. It's not on topic here.
>
> The dev list (i.e. this list) is for discussions about the development of
> Spark itself.
>
> On Wed, May 15, 2019 at 1:50 PM Chetan Khatri 
> wrote:
>
>> Any one help me, I am confused. :(
>>
>> On Wed, May 15, 2019 at 7:28 PM Chetan Khatri <
>> chetan.opensou...@gmail.com> wrote:
>>
>>> Hello Spark Developers,
>>>
>>> I have a question on Spark Join I am doing.
>>>
>>> I have a full load data from RDBMS and storing at HDFS let's say,
>>>
>>> val historyDF = spark.read.parquet(*"/home/test/transaction-line-item"*)
>>>
>>> and I am getting changed data at seperate hdfs path,let's say;
>>>
>>> val deltaDF = spark.read.parquet("/home/test/transaction-line-item-delta")
>>>
>>> Now I would like to take rows from deltaDF and ignore only those records 
>>> from historyDF, and write to some MySQL table.
>>>
>>> Once I am done with writing to MySQL table, I would like to update 
>>> */home/test/transaction-line-item *as overwrite. Now I can't just
>>>
>>> overwrite because lazy evaluation and DAG structure unless write to 
>>> somewhere else and then write back as overwrite.
>>>
>>> val syncDataDF = historyDF.join(deltaDF.select("TRANSACTION_BY_LINE_ID", 
>>> "sys_change_column"), Seq("TRANSACTION_BY_LINE_ID"),
>>>   "left_outer").filter(deltaDF.col("sys_change_column").isNull)
>>> .drop(deltaDF.col("sys_change_column"))
>>>
>>> val mergedDataDF = syncDataDF.union(deltaDF)
>>>
>>> I believe, Without doing *union *, only with Join this can be done. Please 
>>> suggest best approach.
>>>
>>> As I can't write back *mergedDataDF * to the path of historyDF, because 
>>> from there I am only reading. What I am doing is to write at temp
>>>
>>> path and then read  from there and write back! Which is bad Idea, I need 
>>> suggestion here...
>>>
>>>
>>> mergedDataDF.write.mode(SaveMode.Overwrite).parquet("home/test/transaction-line-item-temp/")
>>> val tempMergedDF = 
>>> spark.read.parquet("home/test/transaction-line-item-temp/")
>>> tempMergedDF.write.mode(SaveMode.Overwrite).parquet("*/home/test/transaction-line-item"*)
>>>
>>>
>>> Please suggest me best approach.
>>>
>>>
>>> Thanks
>>>
>>>
>>>
>>>


Re: Suggestion on Join Approach with Spark

2019-05-15 Thread Chetan Khatri
Any one help me, I am confused. :(

On Wed, May 15, 2019 at 7:28 PM Chetan Khatri 
wrote:

> Hello Spark Developers,
>
> I have a question on Spark Join I am doing.
>
> I have a full load data from RDBMS and storing at HDFS let's say,
>
> val historyDF = spark.read.parquet(*"/home/test/transaction-line-item"*)
>
> and I am getting changed data at seperate hdfs path,let's say;
>
> val deltaDF = spark.read.parquet("/home/test/transaction-line-item-delta")
>
> Now I would like to take rows from deltaDF and ignore only those records from 
> historyDF, and write to some MySQL table.
>
> Once I am done with writing to MySQL table, I would like to update 
> */home/test/transaction-line-item *as overwrite. Now I can't just
>
> overwrite because lazy evaluation and DAG structure unless write to somewhere 
> else and then write back as overwrite.
>
> val syncDataDF = historyDF.join(deltaDF.select("TRANSACTION_BY_LINE_ID", 
> "sys_change_column"), Seq("TRANSACTION_BY_LINE_ID"),
>   "left_outer").filter(deltaDF.col("sys_change_column").isNull)
> .drop(deltaDF.col("sys_change_column"))
>
> val mergedDataDF = syncDataDF.union(deltaDF)
>
> I believe, Without doing *union *, only with Join this can be done. Please 
> suggest best approach.
>
> As I can't write back *mergedDataDF * to the path of historyDF, because from 
> there I am only reading. What I am doing is to write at temp
>
> path and then read  from there and write back! Which is bad Idea, I need 
> suggestion here...
>
>
> mergedDataDF.write.mode(SaveMode.Overwrite).parquet("home/test/transaction-line-item-temp/")
> val tempMergedDF = spark.read.parquet("home/test/transaction-line-item-temp/")
> tempMergedDF.write.mode(SaveMode.Overwrite).parquet("*/home/test/transaction-line-item"*)
>
>
> Please suggest me best approach.
>
>
> Thanks
>
>
>
>


Suggestion on Join Approach with Spark

2019-05-15 Thread Chetan Khatri
Hello Spark Developers,

I have a question on Spark Join I am doing.

I have a full load data from RDBMS and storing at HDFS let's say,

val historyDF = spark.read.parquet(*"/home/test/transaction-line-item"*)

and I am getting changed data at seperate hdfs path,let's say;

val deltaDF = spark.read.parquet("/home/test/transaction-line-item-delta")

Now I would like to take rows from deltaDF and ignore only those
records from historyDF, and write to some MySQL table.

Once I am done with writing to MySQL table, I would like to update
*/home/test/transaction-line-item *as overwrite. Now I can't just

overwrite because lazy evaluation and DAG structure unless write to
somewhere else and then write back as overwrite.

val syncDataDF =
historyDF.join(deltaDF.select("TRANSACTION_BY_LINE_ID",
"sys_change_column"), Seq("TRANSACTION_BY_LINE_ID"),
  "left_outer").filter(deltaDF.col("sys_change_column").isNull)
.drop(deltaDF.col("sys_change_column"))

val mergedDataDF = syncDataDF.union(deltaDF)

I believe, Without doing *union *, only with Join this can be done.
Please suggest best approach.

As I can't write back *mergedDataDF * to the path of historyDF,
because from there I am only reading. What I am doing is to write at
temp

path and then read  from there and write back! Which is bad Idea, I
need suggestion here...


mergedDataDF.write.mode(SaveMode.Overwrite).parquet("home/test/transaction-line-item-temp/")
val tempMergedDF = spark.read.parquet("home/test/transaction-line-item-temp/")
tempMergedDF.write.mode(SaveMode.Overwrite).parquet("*/home/test/transaction-line-item"*)


Please suggest me best approach.


Thanks


Re: Need help for Delta.io

2019-05-11 Thread Chetan Khatri
Any thoughts.. Please

On Fri, May 10, 2019 at 2:22 AM Chetan Khatri 
wrote:

> Hello All,
>
> I need your help / suggestions,
>
> I am using Spark 2.3.1 with HDP 2.6.1 Distribution, I will tell my use
> case so you get it where people are trying to use Delta.
> My use case is I have source as a MSSQL Server (OLTP) and get data at HDFS
> currently in Parquet and Avro formats. Now I would like to do Incremental
> load / delta load, so I am using CT (Change Tracking Ref.
> https://docs.microsoft.com/en-us/sql/relational-databases/track-changes/enable-and-disable-change-tracking-sql-server?view=sql-server-2017)
> to get updated and deleted records Primary Key and using that I am only
> pulling those records which got updated and deleted. And I would like to
> now Update / Delete Data from Parquet. Currently I am doing full  load,
> which I would like to avoid.
>
> Could you please suggest me, what is best approach.
>
> As HDP doesn't have Spark 2.4.2 available so I can't change the
> infrastructure, Is there any way to use Delta.io on Spark 2.3.1 as I have
> existing codebase written for last year and half  in Scala 2.11  which also
> I don't want to break with Scala 2.12.
>
> I don't need versioning, transaction log at parquet. So if anything else
> fits to my use case. Please do advise.
>
> Thank you.
>


How to parallelize JDBC Read in Spark

2018-09-06 Thread Chetan Khatri
Hello Dev Users,

I am struggling to parallelize JDBC Read in Spark, It is using 1 - 2 task
only to read data and taking so much of time to read.

Ex.

val invoiceLineItemDF = ((spark.read.jdbc(url = t360jdbcURL,
  table = invoiceLineItemQuery,
  columnName = "INVOICE_LINE_ITEM_ID",
  lowerBound = 1L,
  upperBound = 100L,
  numPartitions = 200,
  connectionProperties = connectionProperties
)))


Thanks


Re: Select top (100) percent equivalent in spark

2018-09-05 Thread Chetan Khatri
Sean, Thank you.
Do you think, tempDF.orderBy($"invoice_id".desc).limit(100)
this can give same result , I think so.

Thanks

On Wed, Sep 5, 2018 at 12:58 AM Sean Owen  wrote:

> Sort and take head(n)?
>
> On Tue, Sep 4, 2018 at 12:07 PM Chetan Khatri 
> wrote:
>
>> Dear Spark dev, anything equivalent in spark ?
>>
>


Re: Select top (100) percent equivalent in spark

2018-09-04 Thread Chetan Khatri
Thanks

On Wed 5 Sep, 2018, 2:15 AM Russell Spitzer, 
wrote:

> RDD: Top
>
> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD@top(num:Int)(implicitord:Ordering[T]):Array[T
> ]
> Which is pretty much what Sean suggested
>
> For Dataframes I think doing a order and limit would be equivalent after
> optimizations.
>
> On Tue, Sep 4, 2018 at 2:28 PM Sean Owen  wrote:
>
>> Sort and take head(n)?
>>
>> On Tue, Sep 4, 2018 at 12:07 PM Chetan Khatri <
>> chetan.opensou...@gmail.com> wrote:
>>
>>> Dear Spark dev, anything equivalent in spark ?
>>>
>>


Select top (100) percent equivalent in spark

2018-09-04 Thread Chetan Khatri
Dear Spark dev, anything equivalent in spark ?


Reading 20 GB of log files from Directory - Out of Memory Error

2018-08-25 Thread Chetan Khatri
Hello Spark Dev Community,

Friend of mine is facing issue while reading 20 GB of log files from
Directory on Cluster.
Approach are as below:

*1. This gives out of memory error.*
val logRDD =
sc.wholeTextFiles("file:/usr/local/hadoop/spark-2.3.0-bin-hadoop2.7/logs/*")
val mappedRDD = logRDD.flatMap { x => x._2.split("[^A-Za-z']+") }.map { x
=> x.replaceAll("""\n""", " ")}

*2. Individual files can be processed with below approach*
val textlogRDD =
sc.textFile("file:///usr/local/hadoop/spark-2.3.0-bin-hadoop2.7/logs/spark-hduser-org.apache.spark.deploy.master.Master-1-chetan-ThinkPad-E460.out")
val textMappedRDD = textlogRDD.flatMap { x => x.split("[^A-Za-z']+")}.map {
y => y.replaceAll("""\n""", " ")}

*3. Could be try.*
val tempRDD =
sc.wholeTextFiles("file:/usr/local/hadoop/spark-2.3.0-bin-hadoop2.7/logs/*").flatMap(files
=> files._2.split("[^A-Za-z']+".replaceAll("""\n"""," ")))

*Thoughts:*
1. What I am thinking is if OutOfMemory is the issue then increasing
driver-memory at spark-submit can help because *collect() *would be causing
issue due to taking everything on driver node.
2. or persisting an RDD on Disk StoreageLevel.MEMORY_AND_DISK_SER2 and then
proceed further.

Any suggestions please ?

Thanks
-Chetan


Re: Run Python User Defined Functions / code in Spark with Scala Codebase

2018-07-15 Thread Chetan Khatri
Hello Jayant,

Thanks for great OSS Contribution :)

On Thu, Jul 12, 2018 at 1:36 PM, Jayant Shekhar 
wrote:

> Hello Chetan,
>
> Sorry missed replying earlier. You can find some sample code here :
>
> http://sparkflows.readthedocs.io/en/latest/user-guide/
> python/pipe-python.html
>
> We will continue adding more there.
>
> Feel free to ping me directly in case of questions.
>
> Thanks,
> Jayant
>
>
> On Mon, Jul 9, 2018 at 9:56 PM, Chetan Khatri  > wrote:
>
>> Hello Jayant,
>>
>> Thank you so much for suggestion. My view was to  use Python function as
>> transformation which can take couple of column names and return object.
>> which you explained. would that possible to point me to similiar codebase
>> example.
>>
>> Thanks.
>>
>> On Fri, Jul 6, 2018 at 2:56 AM, Jayant Shekhar 
>> wrote:
>>
>>> Hello Chetan,
>>>
>>> We have currently done it with .pipe(.py) as Prem suggested.
>>>
>>> That passes the RDD as CSV strings to the python script. The python
>>> script can either process it line by line, create the result and return it
>>> back. Or create things like Pandas Dataframe for processing and finally
>>> write the results back.
>>>
>>> In the Spark/Scala/Java code, you get an RDD of string, which we convert
>>> back to a Dataframe.
>>>
>>> Feel free to ping me directly in case of questions.
>>>
>>> Thanks,
>>> Jayant
>>>
>>>
>>> On Thu, Jul 5, 2018 at 3:39 AM, Chetan Khatri <
>>> chetan.opensou...@gmail.com> wrote:
>>>
>>>> Prem sure, Thanks for suggestion.
>>>>
>>>> On Wed, Jul 4, 2018 at 8:38 PM, Prem Sure 
>>>> wrote:
>>>>
>>>>> try .pipe(.py) on RDD
>>>>>
>>>>> Thanks,
>>>>> Prem
>>>>>
>>>>> On Wed, Jul 4, 2018 at 7:59 PM, Chetan Khatri <
>>>>> chetan.opensou...@gmail.com> wrote:
>>>>>
>>>>>> Can someone please suggest me , thanks
>>>>>>
>>>>>> On Tue 3 Jul, 2018, 5:28 PM Chetan Khatri, <
>>>>>> chetan.opensou...@gmail.com> wrote:
>>>>>>
>>>>>>> Hello Dear Spark User / Dev,
>>>>>>>
>>>>>>> I would like to pass Python user defined function to Spark Job
>>>>>>> developed using Scala and return value of that function would be 
>>>>>>> returned
>>>>>>> to DF / Dataset API.
>>>>>>>
>>>>>>> Can someone please guide me, which would be best approach to do
>>>>>>> this. Python function would be mostly transformation function. Also 
>>>>>>> would
>>>>>>> like to pass Java Function as a String to Spark / Scala job and it 
>>>>>>> applies
>>>>>>> to RDD / Data Frame and should return RDD / Data Frame.
>>>>>>>
>>>>>>> Thank you.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: Run Python User Defined Functions / code in Spark with Scala Codebase

2018-07-09 Thread Chetan Khatri
Hello Jayant,

Thank you so much for suggestion. My view was to  use Python function as
transformation which can take couple of column names and return object.
which you explained. would that possible to point me to similiar codebase
example.

Thanks.

On Fri, Jul 6, 2018 at 2:56 AM, Jayant Shekhar 
wrote:

> Hello Chetan,
>
> We have currently done it with .pipe(.py) as Prem suggested.
>
> That passes the RDD as CSV strings to the python script. The python script
> can either process it line by line, create the result and return it back.
> Or create things like Pandas Dataframe for processing and finally write the
> results back.
>
> In the Spark/Scala/Java code, you get an RDD of string, which we convert
> back to a Dataframe.
>
> Feel free to ping me directly in case of questions.
>
> Thanks,
> Jayant
>
>
> On Thu, Jul 5, 2018 at 3:39 AM, Chetan Khatri  > wrote:
>
>> Prem sure, Thanks for suggestion.
>>
>> On Wed, Jul 4, 2018 at 8:38 PM, Prem Sure  wrote:
>>
>>> try .pipe(.py) on RDD
>>>
>>> Thanks,
>>> Prem
>>>
>>> On Wed, Jul 4, 2018 at 7:59 PM, Chetan Khatri <
>>> chetan.opensou...@gmail.com> wrote:
>>>
>>>> Can someone please suggest me , thanks
>>>>
>>>> On Tue 3 Jul, 2018, 5:28 PM Chetan Khatri, 
>>>> wrote:
>>>>
>>>>> Hello Dear Spark User / Dev,
>>>>>
>>>>> I would like to pass Python user defined function to Spark Job
>>>>> developed using Scala and return value of that function would be returned
>>>>> to DF / Dataset API.
>>>>>
>>>>> Can someone please guide me, which would be best approach to do this.
>>>>> Python function would be mostly transformation function. Also would like 
>>>>> to
>>>>> pass Java Function as a String to Spark / Scala job and it applies to RDD 
>>>>> /
>>>>> Data Frame and should return RDD / Data Frame.
>>>>>
>>>>> Thank you.
>>>>>
>>>>>
>>>>>
>>>>>
>>>
>>
>


Re: Run Python User Defined Functions / code in Spark with Scala Codebase

2018-07-05 Thread Chetan Khatri
Prem sure, Thanks for suggestion.

On Wed, Jul 4, 2018 at 8:38 PM, Prem Sure  wrote:

> try .pipe(.py) on RDD
>
> Thanks,
> Prem
>
> On Wed, Jul 4, 2018 at 7:59 PM, Chetan Khatri  > wrote:
>
>> Can someone please suggest me , thanks
>>
>> On Tue 3 Jul, 2018, 5:28 PM Chetan Khatri, 
>> wrote:
>>
>>> Hello Dear Spark User / Dev,
>>>
>>> I would like to pass Python user defined function to Spark Job developed
>>> using Scala and return value of that function would be returned to DF /
>>> Dataset API.
>>>
>>> Can someone please guide me, which would be best approach to do this.
>>> Python function would be mostly transformation function. Also would like to
>>> pass Java Function as a String to Spark / Scala job and it applies to RDD /
>>> Data Frame and should return RDD / Data Frame.
>>>
>>> Thank you.
>>>
>>>
>>>
>>>
>


Re: Run Python User Defined Functions / code in Spark with Scala Codebase

2018-07-04 Thread Chetan Khatri
Can someone please suggest me , thanks

On Tue 3 Jul, 2018, 5:28 PM Chetan Khatri, 
wrote:

> Hello Dear Spark User / Dev,
>
> I would like to pass Python user defined function to Spark Job developed
> using Scala and return value of that function would be returned to DF /
> Dataset API.
>
> Can someone please guide me, which would be best approach to do this.
> Python function would be mostly transformation function. Also would like to
> pass Java Function as a String to Spark / Scala job and it applies to RDD /
> Data Frame and should return RDD / Data Frame.
>
> Thank you.
>
>
>
>


Run Python User Defined Functions / code in Spark with Scala Codebase

2018-07-03 Thread Chetan Khatri
Hello Dear Spark User / Dev,

I would like to pass Python user defined function to Spark Job developed
using Scala and return value of that function would be returned to DF /
Dataset API.

Can someone please guide me, which would be best approach to do this.
Python function would be mostly transformation function. Also would like to
pass Java Function as a String to Spark / Scala job and it applies to RDD /
Data Frame and should return RDD / Data Frame.

Thank you.


Re: Spark Writing to parquet directory : java.io.IOException: Disk quota exceeded

2017-11-22 Thread Chetan Khatri
Anybody reply on this ?

On Tue, Nov 21, 2017 at 3:36 PM, Chetan Khatri <chetan.opensou...@gmail.com>
wrote:

>
> Hello Spark Users,
>
> I am getting below error, when i am trying to write dataset to parquet
> location. I have enough disk space available. Last time i was facing same
> kind of error which were resolved by increasing number of cores at hyper
> parameters. Currently result set data size is almost 400Gig with below
> hyper parameters
>
> Driver memory: 4g
> Executor Memory: 16g
> Executor cores=12
> num executors= 8
>
> Still it's failing, any Idea ? that if i increase executor memory and
> number of executors.  it could get resolved ?
>
>
> 17/11/21 04:29:37 ERROR storage.DiskBlockObjectWriter: Uncaught exception
> while reverting partial writes to file /mapr/chetan/local/david.com/
> tmp/hadoop/nm-local-dir/usercache/david-khurana/appcache/application_
> 1509639363072_10572/blockmgr-008604e6-37cb-421f-8cc5-
> e94db75684e7/12/temp_shuffle_ae885911-a1ef-404f-9a6a-ded544bb5b3c
> java.io.IOException: Disk quota exceeded
> at java.io.FileOutputStream.close0(Native Method)
> at java.io.FileOutputStream.access$000(FileOutputStream.java:53)
> at java.io.FileOutputStream$1.close(FileOutputStream.java:356)
> at java.io.FileDescriptor.closeAll(FileDescriptor.java:212)
> at java.io.FileOutputStream.close(FileOutputStream.java:354)
> at org.apache.spark.storage.TimeTrackingOutputStream.close(
> TimeTrackingOutputStream.java:72)
> at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
> at net.jpountz.lz4.LZ4BlockOutputStream.close(
> LZ4BlockOutputStream.java:178)
> at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
> at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
> at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$
> anon$2.close(UnsafeRowSerializer.scala:96)
> at org.apache.spark.storage.DiskBlockObjectWriter$$
> anonfun$close$2.apply$mcV$sp(DiskBlockObjectWriter.scala:108)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.
> scala:1316)
> at org.apache.spark.storage.DiskBlockObjectWriter.close(
> DiskBlockObjectWriter.scala:107)
> at org.apache.spark.storage.DiskBlockObjectWriter.
> revertPartialWritesAndClose(DiskBlockObjectWriter.scala:159)
> at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.
> stop(BypassMergeSortShuffleWriter.java:234)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:85)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:47)
> at org.apache.spark.scheduler.Task.run(Task.scala:86)
> at org.apache.spark.executor.Executor$TaskRunner.run(
> Executor.scala:274)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> 17/11/21 04:29:37 WARN netty.OneWayOutboxMessage: Failed to send one-way
> RPC.
> java.io.IOException: Failed to connect to /192.168.123.43:58889
> at org.apache.spark.network.client.TransportClientFactory.
> createClient(TransportClientFactory.java:228)
> at org.apache.spark.network.client.TransportClientFactory.
> createClient(TransportClientFactory.java:179)
> at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(
> NettyRpcEnv.scala:197)
> at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.
> scala:191)
> at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.
> scala:187)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.net.ConnectException: Connection refused: /
> 192.168.123.43:58889
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> at sun.nio.ch.SocketChannelImpl.finishConnect(
> SocketChannelImpl.java:717)
> at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(
> NioSocketChannel.java:224)
> at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.
> finishConnect(AbstractNioChannel.java:289)
> at io.netty.channel.nio.NioEventLoop.processSelectedKey(
> NioEventLoop.java:528)
> at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(
> NioEventLoop.java:468)
> at io.netty.channel.nio.NioEventLoop.processSelectedKeys(
> NioEventLoop.java:382)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> at io.netty.util.concurrent.SingleThreadEventExecutor$2.
> run(SingleThreadEventExecutor.java:111)
>   ... 1 more
>


Divide Spark Dataframe to parts by timestamp

2017-11-12 Thread Chetan Khatri
Hello All,

I have Spark Dataframe with timestamp from 2015-10-07 19:36:59 to
2017-01-01 18:53:23

If i want to split this Dataframe to 3 parts, I wrote below code to split
it. Can anyone please confirm is this correct approach or not ?!

val finalDF1 = sampleDF.where(sampleDF.col("timestamp_col").gt("2017-01-01
23:59:59"))
  val finalDF2 =
sampleDF.where(sampleDF.col("timestamp_col").lt("2017-01-02 00:00:00") and
sampleDF.col("timestamp_col").gt("2016-06-31 23:59:59"))
  val finalDF3 =
sampleDF.where(sampleDF.col("timestamp_col").lt("2016-06-30 00:00:00") or
sampleDF.col("timestamp_col").isNull)

Thanks.


Re: Joining 3 tables with 17 billions records

2017-11-02 Thread Chetan Khatri
Jorn,

This is kind of one time load from Historical Data to Analytical Hive
engine. Hive version 1.2.1 and Spark version 2.0.1 with MapR distribution.

Writing every table to parquet and reading it could be very much time
consuming, currently entire job could take ~8 hours on 8 node of 100 Gig
ram 20 core cluster, not only used utilized by me but by larger team.

Thanks


On Fri, Nov 3, 2017 at 1:31 AM, Jörn Franke <jornfra...@gmail.com> wrote:

> Hi,
>
> Do you have a more detailed log/error message?
> Also, can you please provide us details on the tables (no of rows,
> columns, size etc).
> Is this just a one time thing or something regular?
> If it is a one time thing then I would tend more towards putting each
> table in HDFS (parquet or ORC) and then join them.
> What is the Hive and Spark version?
>
> Best regards
>
> > On 2. Nov 2017, at 20:57, Chetan Khatri <chetan.opensou...@gmail.com>
> wrote:
> >
> > Hello Spark Developers,
> >
> > I have 3 tables that i am reading from HBase and wants to do join
> transformation and save to Hive Parquet external table. Currently my join
> is failing with container failed error.
> >
> > 1. Read table A from Hbase with ~17 billion records.
> > 2. repartition on primary key of table A
> > 3. create temp view of table A Dataframe
> > 4. Read table B from HBase with ~4 billion records
> > 5. repartition on primary key of table B
> > 6. create temp view of table B Dataframe
> > 7. Join both view of A and B and create Dataframe C
> > 8.  Join Dataframe C with table D
> > 9. coleance(20) to reduce number of file creation on already
> repartitioned DF.
> > 10. Finally store to external hive table with partition by skey.
> >
> > Any Suggestion or resources you come across please do share suggestions
> on this to optimize this.
> >
> > Thanks
> > Chetan
>


Joining 3 tables with 17 billions records

2017-11-02 Thread Chetan Khatri
Hello Spark Developers,

I have 3 tables that i am reading from HBase and wants to do join
transformation and save to Hive Parquet external table. Currently my join
is failing with container failed error.

1. Read table A from Hbase with ~17 billion records.
2. repartition on primary key of table A
3. create temp view of table A Dataframe
4. Read table B from HBase with ~4 billion records
5. repartition on primary key of table B
6. create temp view of table B Dataframe
7. Join both view of A and B and create Dataframe C
8.  Join Dataframe C with table D
9. coleance(20) to reduce number of file creation on already repartitioned
DF.
10. Finally store to external hive table with partition by skey.

Any Suggestion or resources you come across please do share suggestions on
this to optimize this.

Thanks
Chetan


Apache Spark Streaming / Spark SQL Job logs

2017-08-30 Thread Chetan Khatri
Hey Spark Dev,

Can anyone suggests sample Spark Streaming / Spark SQL Job logs to
download. I want to play with Log analytics.

Thanks


Re: Reparitioning Hive tables - Container killed by YARN for exceeding memory limits

2017-08-03 Thread Chetan Khatri
Thanks Holden !


On Thu, Aug 3, 2017 at 4:02 AM, Holden Karau <hol...@pigscanfly.ca> wrote:

> The memory overhead is based less on the total amount of data and more on
> what you end up doing with the data (e.g. if your doing a lot of off-heap
> processing or using Python you need to increase it). Honestly most people
> find this number for their job "experimentally" (e.g. they try a few
> different things).
>
> On Wed, Aug 2, 2017 at 1:52 PM, Chetan Khatri <chetan.opensou...@gmail.com
> > wrote:
>
>> Ryan,
>> Thank you for reply.
>>
>> For 2 TB of Data what should be the value of
>> spark.yarn.executor.memoryOverhead = ?
>>
>> with regards to this - i see issue at spark
>> https://issues.apache.org/jira/browse/SPARK-18787 , not sure whether it
>> works or not at Spark 2.0.1  !
>>
>> can you elaborate more for spark.memory.fraction setting.
>>
>> number of partitions = 674
>> Cluster: 455 GB total memory, VCores: 288, Nodes: 17
>> Given / tried memory config: executor-mem = 16g, num-executor=10,
>> executor cores=6, driver mem=4g
>>
>> spark.default.parallelism=1000
>> spark.sql.shuffle.partitions=1000
>> spark.yarn.executor.memoryOverhead=2048
>> spark.shuffle.io.preferDirectBufs=false
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Wed, Aug 2, 2017 at 10:43 PM, Ryan Blue <rb...@netflix.com> wrote:
>>
>>> Chetan,
>>>
>>> When you're writing to a partitioned table, you want to use a shuffle to
>>> avoid the situation where each task has to write to every partition. You
>>> can do that either by adding a repartition by your table's partition keys,
>>> or by adding an order by with the partition keys and then columns you
>>> normally use to filter when reading the table. I generally recommend the
>>> second approach because it handles skew and prepares the data for more
>>> efficient reads.
>>>
>>> If that doesn't help, then you should look at your memory settings. When
>>> you're getting killed by YARN, you should consider setting `
>>> spark.shuffle.io.preferDirectBufs=false` so you use less off-heap
>>> memory that the JVM doesn't account for. That is usually an easier fix than
>>> increasing the memory overhead. Also, when you set executor memory, always
>>> change spark.memory.fraction to ensure the memory you're adding is used
>>> where it is needed. If your memory fraction is the default 60%, then 60% of
>>> the memory will be used for Spark execution, not reserved whatever is
>>> consuming it and causing the OOM. (If Spark's memory is too low, you'll see
>>> other problems like spilling too much to disk.)
>>>
>>> rb
>>>
>>> On Wed, Aug 2, 2017 at 9:02 AM, Chetan Khatri <
>>> chetan.opensou...@gmail.com> wrote:
>>>
>>>> Can anyone please guide me with above issue.
>>>>
>>>>
>>>> On Wed, Aug 2, 2017 at 6:28 PM, Chetan Khatri <
>>>> chetan.opensou...@gmail.com> wrote:
>>>>
>>>>> Hello Spark Users,
>>>>>
>>>>> I have Hbase table reading and writing to Hive managed table where i
>>>>> applied partitioning by date column which worked fine but it has generate
>>>>> more number of files in almost 700 partitions but i wanted to use
>>>>> reparation to reduce File I/O by reducing number of files inside each
>>>>> partition.
>>>>>
>>>>> *But i ended up with below exception:*
>>>>>
>>>>> ExecutorLostFailure (executor 11 exited caused by one of the running
>>>>> tasks) Reason: Container killed by YARN for exceeding memory limits. 14.0
>>>>> GB of 14 GB physical memory used. Consider boosting spark.yarn.executor.
>>>>> memoryOverhead.
>>>>>
>>>>> Driver memory=4g, executor mem=12g, num-executors=8, executor core=8
>>>>>
>>>>> Do you think below setting can help me to overcome above issue:
>>>>>
>>>>> spark.default.parellism=1000
>>>>> spark.sql.shuffle.partitions=1000
>>>>>
>>>>> Because default max number of partitions are 1000.
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>>
>>> --
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
>>>
>>
>>
>
>
> --
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau
>


Re: Reparitioning Hive tables - Container killed by YARN for exceeding memory limits

2017-08-02 Thread Chetan Khatri
Ryan,
Thank you for reply.

For 2 TB of Data what should be the value of
spark.yarn.executor.memoryOverhead = ?

with regards to this - i see issue at spark
https://issues.apache.org/jira/browse/SPARK-18787 , not sure whether it
works or not at Spark 2.0.1  !

can you elaborate more for spark.memory.fraction setting.

number of partitions = 674
Cluster: 455 GB total memory, VCores: 288, Nodes: 17
Given / tried memory config: executor-mem = 16g, num-executor=10, executor
cores=6, driver mem=4g

spark.default.parallelism=1000
spark.sql.shuffle.partitions=1000
spark.yarn.executor.memoryOverhead=2048
spark.shuffle.io.preferDirectBufs=false









On Wed, Aug 2, 2017 at 10:43 PM, Ryan Blue <rb...@netflix.com> wrote:

> Chetan,
>
> When you're writing to a partitioned table, you want to use a shuffle to
> avoid the situation where each task has to write to every partition. You
> can do that either by adding a repartition by your table's partition keys,
> or by adding an order by with the partition keys and then columns you
> normally use to filter when reading the table. I generally recommend the
> second approach because it handles skew and prepares the data for more
> efficient reads.
>
> If that doesn't help, then you should look at your memory settings. When
> you're getting killed by YARN, you should consider setting `
> spark.shuffle.io.preferDirectBufs=false` so you use less off-heap memory
> that the JVM doesn't account for. That is usually an easier fix than
> increasing the memory overhead. Also, when you set executor memory, always
> change spark.memory.fraction to ensure the memory you're adding is used
> where it is needed. If your memory fraction is the default 60%, then 60% of
> the memory will be used for Spark execution, not reserved whatever is
> consuming it and causing the OOM. (If Spark's memory is too low, you'll see
> other problems like spilling too much to disk.)
>
> rb
>
> On Wed, Aug 2, 2017 at 9:02 AM, Chetan Khatri <chetan.opensou...@gmail.com
> > wrote:
>
>> Can anyone please guide me with above issue.
>>
>>
>> On Wed, Aug 2, 2017 at 6:28 PM, Chetan Khatri <
>> chetan.opensou...@gmail.com> wrote:
>>
>>> Hello Spark Users,
>>>
>>> I have Hbase table reading and writing to Hive managed table where i
>>> applied partitioning by date column which worked fine but it has generate
>>> more number of files in almost 700 partitions but i wanted to use
>>> reparation to reduce File I/O by reducing number of files inside each
>>> partition.
>>>
>>> *But i ended up with below exception:*
>>>
>>> ExecutorLostFailure (executor 11 exited caused by one of the running
>>> tasks) Reason: Container killed by YARN for exceeding memory limits. 14.0
>>> GB of 14 GB physical memory used. Consider boosting spark.yarn.executor.
>>> memoryOverhead.
>>>
>>> Driver memory=4g, executor mem=12g, num-executors=8, executor core=8
>>>
>>> Do you think below setting can help me to overcome above issue:
>>>
>>> spark.default.parellism=1000
>>> spark.sql.shuffle.partitions=1000
>>>
>>> Because default max number of partitions are 1000.
>>>
>>>
>>>
>>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>


Re: Reparitioning Hive tables - Container killed by YARN for exceeding memory limits

2017-08-02 Thread Chetan Khatri
Can anyone please guide me with above issue.


On Wed, Aug 2, 2017 at 6:28 PM, Chetan Khatri <chetan.opensou...@gmail.com>
wrote:

> Hello Spark Users,
>
> I have Hbase table reading and writing to Hive managed table where i
> applied partitioning by date column which worked fine but it has generate
> more number of files in almost 700 partitions but i wanted to use
> reparation to reduce File I/O by reducing number of files inside each
> partition.
>
> *But i ended up with below exception:*
>
> ExecutorLostFailure (executor 11 exited caused by one of the running
> tasks) Reason: Container killed by YARN for exceeding memory limits. 14.0
> GB of 14 GB physical memory used. Consider boosting spark.yarn.executor.
> memoryOverhead.
>
> Driver memory=4g, executor mem=12g, num-executors=8, executor core=8
>
> Do you think below setting can help me to overcome above issue:
>
> spark.default.parellism=1000
> spark.sql.shuffle.partitions=1000
>
> Because default max number of partitions are 1000.
>
>
>


Re: Support Dynamic Partition Inserts params with SET command in Spark 2.0.1

2017-07-28 Thread Chetan Khatri
I think it will be same, but let me try that

FYR - https://issues.apache.org/jira/browse/SPARK-19881

On Fri, Jul 28, 2017 at 4:44 PM, ayan guha <guha.a...@gmail.com> wrote:

> Try running spark.sql("set yourconf=val")
>
> On Fri, 28 Jul 2017 at 8:51 pm, Chetan Khatri <chetan.opensou...@gmail.com>
> wrote:
>
>> Jorn, Both are same.
>>
>> On Fri, Jul 28, 2017 at 4:18 PM, Jörn Franke <jornfra...@gmail.com>
>> wrote:
>>
>>> Try sparksession.conf().set
>>>
>>> On 28. Jul 2017, at 12:19, Chetan Khatri <chetan.opensou...@gmail.com>
>>> wrote:
>>>
>>> Hey Dev/ USer,
>>>
>>> I am working with Spark 2.0.1 and with dynamic partitioning with Hive
>>> facing below issue:
>>>
>>> org.apache.hadoop.hive.ql.metadata.HiveException:
>>> Number of dynamic partitions created is 1344, which is more than 1000.
>>> To solve this try to set hive.exec.max.dynamic.partitions to at least
>>> 1344.
>>>
>>> I tried below options, but failed:
>>>
>>> val spark = sparkSession.builder().enableHiveSupport().getOrCreate()
>>>
>>> *spark.sqlContext.setConf("hive.exec.max.dynamic.partitions", "2000")*
>>>
>>> Please help with alternate workaround !
>>>
>>> Thanks
>>>
>>>
>> --
> Best Regards,
> Ayan Guha
>


Re: Support Dynamic Partition Inserts params with SET command in Spark 2.0.1

2017-07-28 Thread Chetan Khatri
Jorn, Both are same.

On Fri, Jul 28, 2017 at 4:18 PM, Jörn Franke <jornfra...@gmail.com> wrote:

> Try sparksession.conf().set
>
> On 28. Jul 2017, at 12:19, Chetan Khatri <chetan.opensou...@gmail.com>
> wrote:
>
> Hey Dev/ USer,
>
> I am working with Spark 2.0.1 and with dynamic partitioning with Hive
> facing below issue:
>
> org.apache.hadoop.hive.ql.metadata.HiveException:
> Number of dynamic partitions created is 1344, which is more than 1000.
> To solve this try to set hive.exec.max.dynamic.partitions to at least
> 1344.
>
> I tried below options, but failed:
>
> val spark = sparkSession.builder().enableHiveSupport().getOrCreate()
>
> *spark.sqlContext.setConf("hive.exec.max.dynamic.partitions", "2000")*
>
> Please help with alternate workaround !
>
> Thanks
>
>


Support Dynamic Partition Inserts params with SET command in Spark 2.0.1

2017-07-28 Thread Chetan Khatri
Hey Dev/ USer,

I am working with Spark 2.0.1 and with dynamic partitioning with Hive
facing below issue:

org.apache.hadoop.hive.ql.metadata.HiveException:
Number of dynamic partitions created is 1344, which is more than 1000.
To solve this try to set hive.exec.max.dynamic.partitions to at least 1344.

I tried below options, but failed:

val spark = sparkSession.builder().enableHiveSupport().getOrCreate()

*spark.sqlContext.setConf("hive.exec.max.dynamic.partitions", "2000")*

Please help with alternate workaround !

Thanks


Flatten JSON to multiple columns in Spark

2017-07-17 Thread Chetan Khatri
Hello Spark Dev's,

Can you please guide me, how to flatten JSON to multiple columns in Spark.

*Example:*

Sr No Title ISBN Info
1 Calculus Theory 1234567890 [{"cert":[{
"authSbmtr":"009415da-c8cd-418d-869e-0a19601d79fa",
009415da-c8cd-418d-869e-0a19601d79fa
"certUUID":"03ea5a1a-5530-4fa3-8871-9d1ebac627c4",
"effDt":"2016-05-06T15:04:56.279Z",
"fileFmt":"rjrCsv","status":"live"}],

"expdCnt":"15",
"mfgAcctNum":"531093",
"oUUID":"23d07397-4fbe-4897-8a18-b79c9f64726c",
"pgmRole":["RETAILER"],
"pgmUUID":"1cb5dd63-817a-45bc-a15c-5660e4accd63",
"regUUID":"cc1bd898-657d-40dc-af5d-4bf1569a1cc4",
"rtlrsSbmtd":["009415da-c8cd-418d-869e-0a19601d79fa"]}]

I want to get single row with 11 columns.

Thanks.


Re: Issues: Generate JSON with null values in Spark 2.0.x

2017-03-20 Thread Chetan Khatri
Exactly.

On Sat, Mar 11, 2017 at 1:35 PM, Dongjin Lee <dong...@apache.org> wrote:

> Hello Chetan,
>
> Could you post some code? If I understood correctly, you are trying to
> save JSON like:
>
> {
>   "first_name": "Dongjin",
>   "last_name: null
> }
>
> not in omitted form, like:
>
> {
>   "first_name": "Dongjin"
> }
>
> right?
>
> - Dongjin
>
> On Wed, Mar 8, 2017 at 5:58 AM, Chetan Khatri <chetan.opensou...@gmail.com
> > wrote:
>
>> Hello Dev / Users,
>>
>> I am working with PySpark Code migration to scala, with Python -
>> Iterating Spark with dictionary and generating JSON with null is possible
>> with json.dumps() which will be converted to SparkSQL[Row] but in scala how
>> can we generate json will null values as a Dataframe ?
>>
>> Thanks.
>>
>
>
>
> --
> *Dongjin Lee*
>
>
> *Software developer in Line+.So interested in massive-scale machine
> learning.facebook: www.facebook.com/dongjin.lee.kr
> <http://www.facebook.com/dongjin.lee.kr>linkedin: 
> kr.linkedin.com/in/dongjinleekr
> <http://kr.linkedin.com/in/dongjinleekr>github:
> <http://goog_969573159/>github.com/dongjinleekr
> <http://github.com/dongjinleekr>twitter: www.twitter.com/dongjinleekr
> <http://www.twitter.com/dongjinleekr>*
>


Issues: Generate JSON with null values in Spark 2.0.x

2017-03-07 Thread Chetan Khatri
Hello Dev / Users,

I am working with PySpark Code migration to scala, with Python - Iterating
Spark with dictionary and generating JSON with null is possible with
json.dumps() which will be converted to SparkSQL[Row] but in scala how can
we generate json will null values as a Dataframe ?

Thanks.


Re: Spark Job Performance monitoring approaches

2017-02-15 Thread Chetan Khatri
Thank you Georg

On Thu, Feb 16, 2017 at 12:30 PM, Georg Heiler <georg.kf.hei...@gmail.com>
wrote:

> I know of the following tools
> https://sites.google.com/site/sparkbigdebug/home https://
> engineering.linkedin.com/blog/2016/04/dr-elephant-open-
> source-self-serve-performance-tuning-hadoop-spark https://
> github.com/SparkMonitor/varOne https://github.com/groupon/sparklint
>
> Chetan Khatri <chetan.opensou...@gmail.com> schrieb am Do., 16. Feb. 2017
> um 06:15 Uhr:
>
>> Hello All,
>>
>> What would be the best approches to monitor Spark Performance, is there
>> any tools for Spark Job Performance monitoring ?
>>
>> Thanks.
>>
>


Spark Job Performance monitoring approaches

2017-02-15 Thread Chetan Khatri
Hello All,

What would be the best approches to monitor Spark Performance, is there any
tools for Spark Job Performance monitoring ?

Thanks.


Re: Update Public Documentation - SparkSession instead of SparkContext

2017-02-15 Thread Chetan Khatri
Sorry, The context i am referring is for below URL
http://spark.apache.org/docs/2.0.1/programming-guide.html



On Wed, Feb 15, 2017 at 1:12 PM, Sean Owen <so...@cloudera.com> wrote:

> When asking a question like this, please actually link to what you are
> referring to. Some is intended.
>
>
> On Wed, Feb 15, 2017, 06:44 Chetan Khatri <chetan.opensou...@gmail.com>
> wrote:
>
>> Hello Spark Dev Team,
>>
>> I was working with my team having most of the confusion that why your
>> public documentation is not updated with SparkSession if SparkSession is
>> the ongoing extension and best practice instead of creating sparkcontext.
>>
>> Thanks.
>>
>


Update Public Documentation - SparkSession instead of SparkContext

2017-02-14 Thread Chetan Khatri
Hello Spark Dev Team,

I was working with my team having most of the confusion that why your
public documentation is not updated with SparkSession if SparkSession is
the ongoing extension and best practice instead of creating sparkcontext.

Thanks.


Re: Error Saving Dataframe to Hive with Spark 2.0.0

2017-01-29 Thread Chetan Khatri
Okey, you are saying that 2.0.0 don't have that patch fixed ? @dev cc--
I don't like everytime changing the service versions !

Thanks.

On Mon, Jan 30, 2017 at 1:10 AM, Jacek Laskowski <ja...@japila.pl> wrote:

> Hi,
>
> I think you have to upgrade to 2.1.0. There were few changes wrt the ERROR
> since.
>
> Jacek
>
>
> On 29 Jan 2017 9:24 a.m., "Chetan Khatri" <chetan.opensou...@gmail.com>
> wrote:
>
> Hello Spark Users,
>
> I am getting error while saving Spark Dataframe to Hive Table:
> Hive 1.2.1
> Spark 2.0.0
> Local environment.
> Note: Job is getting executed successfully and the way I want but still
> Exception raised.
> *Source Code:*
>
> package com.chetan.poc.hbase
>
> /**
>   * Created by chetan on 24/1/17.
>   */
> import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration}
> import org.apache.hadoop.hbase.mapreduce.TableInputFormat
> import org.apache.hadoop.hbase.util.Bytes
> import org.apache.hadoop.hbase.KeyValue.Type
> import org.apache.spark.sql.SparkSession
> import scala.collection.JavaConverters._
> import java.util.Date
> import java.text.SimpleDateFormat
>
>
> object IncrementalJob {
> val APP_NAME: String = "SparkHbaseJob"
> var HBASE_DB_HOST: String = null
> var HBASE_TABLE: String = null
> var HBASE_COLUMN_FAMILY: String = null
> var HIVE_DATA_WAREHOUSE: String = null
> var HIVE_TABLE_NAME: String = null
>   def main(args: Array[String]) {
> // Initializing HBASE Configuration variables
> HBASE_DB_HOST="127.0.0.1"
> HBASE_TABLE="university"
> HBASE_COLUMN_FAMILY="emp"
> // Initializing Hive Metastore configuration
> HIVE_DATA_WAREHOUSE = "/usr/local/hive/warehouse"
> // Initializing Hive table name - Target table
> HIVE_TABLE_NAME = "employees"
> // setting spark application
> // val sparkConf = new SparkConf().setAppName(APP_NAME).setMaster("local")
> //initialize the spark context
> //val sparkContext = new SparkContext(sparkConf)
> //val sqlContext = new org.apache.spark.sql.SQLContext(sparkContext)
> // Enable Hive with Hive warehouse in SparkSession
> val spark = 
> SparkSession.builder().appName(APP_NAME).config("hive.metastore.warehouse.dir",
>  HIVE_DATA_WAREHOUSE).config("spark.sql.warehouse.dir", 
> HIVE_DATA_WAREHOUSE).enableHiveSupport().getOrCreate()
> import spark.implicits._
> import spark.sql
>
> val conf = HBaseConfiguration.create()
> conf.set(TableInputFormat.INPUT_TABLE, HBASE_TABLE)
> conf.set(TableInputFormat.SCAN_COLUMNS, HBASE_COLUMN_FAMILY)
> // Load an RDD of rowkey, result(ImmutableBytesWritable, Result) tuples 
> from the table
> val hBaseRDD = spark.sparkContext.newAPIHadoopRDD(conf, 
> classOf[TableInputFormat],
>   classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
>   classOf[org.apache.hadoop.hbase.client.Result])
>
> println(hBaseRDD.count())
> //hBaseRDD.foreach(println)
>
> //keyValue is a RDD[java.util.list[hbase.KeyValue]]
> val keyValue = hBaseRDD.map(x => x._2).map(_.list)
>
> //outPut is a RDD[String], in which each line represents a record in HBase
> val outPut = keyValue.flatMap(x =>  x.asScala.map(cell =>
>
>   HBaseResult(
> Bytes.toInt(CellUtil.cloneRow(cell)),
> Bytes.toStringBinary(CellUtil.cloneFamily(cell)),
> Bytes.toStringBinary(CellUtil.cloneQualifier(cell)),
> cell.getTimestamp,
> new SimpleDateFormat("-MM-dd HH:mm:ss:SSS").format(new 
> Date(cell.getTimestamp.toLong)),
> Bytes.toStringBinary(CellUtil.cloneValue(cell)),
> Type.codeToType(cell.getTypeByte).toString
> )
>   )
> ).toDF()
> // Output dataframe
> outPut.show
>
> // get timestamp
> val datetimestamp_threshold = "2016-08-25 14:27:02:001"
> val datetimestampformat = new SimpleDateFormat("-MM-dd 
> HH:mm:ss:SSS").parse(datetimestamp_threshold).getTime()
>
> // Resultset filteration based on timestamp
> val filtered_output_timestamp = outPut.filter($"colDatetime" >= 
> datetimestampformat)
> // Resultset filteration based on rowkey
> val filtered_output_row = 
> outPut.filter($"colDatetime".between(1668493360,1668493365))
>
>
> // Saving Dataframe to Hive Table Successfully.
> 
> filtered_output_timestamp.write.mode("append").saveAsTable(HIVE_TABLE_NAME)
>   }
>   case class HBaseResult(rowkey: Int, colFamily: String, colQualifier: 
> String, colDate

Re: HBaseContext with Spark

2017-01-27 Thread Chetan Khatri
storage handler bulk load:

SET hive.hbase.bulk=true;
INSERT OVERWRITE TABLE users SELECT … ;
But for now, you have to do some work and issue multiple Hive commands
Sample source data for range partitioning
Save sampling results to a file
Run CLUSTER BY query using HiveHFileOutputFormat and TotalOrderPartitioner
(sorts data, producing a large number of region files)
Import HFiles into HBase
HBase can merge files if necessary

On Sat, Jan 28, 2017 at 11:32 AM, Chetan Khatri <chetan.opensou...@gmail.com
> wrote:

> @Ted, I dont think so.
>
> On Thu, Jan 26, 2017 at 6:35 AM, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> Does the storage handler provide bulk load capability ?
>>
>> Cheers
>>
>> On Jan 25, 2017, at 3:39 AM, Amrit Jangid <amrit.jan...@goibibo.com>
>> wrote:
>>
>> Hi chetan,
>>
>> If you just need HBase Data into Hive, You can use Hive EXTERNAL TABLE
>> with
>>
>> STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'.
>>
>>
>> Try this if you problem can be solved
>>
>>
>> https://cwiki.apache.org/confluence/display/Hive/HBaseIntegration
>>
>>
>> Regards
>>
>> Amrit
>>
>>
>> .
>>
>> On Wed, Jan 25, 2017 at 5:02 PM, Chetan Khatri <
>> chetan.opensou...@gmail.com> wrote:
>>
>>> Hello Spark Community Folks,
>>>
>>> Currently I am using HBase 1.2.4 and Hive 1.2.1, I am looking for Bulk
>>> Load from Hbase to Hive.
>>>
>>> I have seen couple of good example at HBase Github Repo:
>>> https://github.com/apache/hbase/tree/master/hbase-spark
>>>
>>> If I would like to use HBaseContext with HBase 1.2.4, how it can be done
>>> ? Or which version of HBase has more stability with HBaseContext ?
>>>
>>> Thanks.
>>>
>>
>>
>>
>>
>>
>


Re: HBaseContext with Spark

2017-01-27 Thread Chetan Khatri
@Ted, I dont think so.

On Thu, Jan 26, 2017 at 6:35 AM, Ted Yu <yuzhih...@gmail.com> wrote:

> Does the storage handler provide bulk load capability ?
>
> Cheers
>
> On Jan 25, 2017, at 3:39 AM, Amrit Jangid <amrit.jan...@goibibo.com>
> wrote:
>
> Hi chetan,
>
> If you just need HBase Data into Hive, You can use Hive EXTERNAL TABLE
> with
>
> STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'.
>
>
> Try this if you problem can be solved
>
>
> https://cwiki.apache.org/confluence/display/Hive/HBaseIntegration
>
>
> Regards
>
> Amrit
>
>
> .
>
> On Wed, Jan 25, 2017 at 5:02 PM, Chetan Khatri <
> chetan.opensou...@gmail.com> wrote:
>
>> Hello Spark Community Folks,
>>
>> Currently I am using HBase 1.2.4 and Hive 1.2.1, I am looking for Bulk
>> Load from Hbase to Hive.
>>
>> I have seen couple of good example at HBase Github Repo:
>> https://github.com/apache/hbase/tree/master/hbase-spark
>>
>> If I would like to use HBaseContext with HBase 1.2.4, how it can be done
>> ? Or which version of HBase has more stability with HBaseContext ?
>>
>> Thanks.
>>
>
>
>
>
>


Re: HBaseContext with Spark

2017-01-25 Thread Chetan Khatri
@Ted Yu, Correct but HBase-Spark module available at HBase repository seems
too old and written code is not optimized yet, I have been already
submitted PR for the same. I dont know if it is clearly mentioned that now
it is part of HBase itself then people are committing to older repo where
original code is still old. [1]

Other sources has updated info [2]

Ref.
[1]
http://blog.cloudera.com/blog/2015/08/apache-spark-comes-to-apache-hbase-with-hbase-spark-module/
[2] https://github.com/cloudera-labs/SparkOnHBase ,
https://github.com/esamson/SparkOnHBase

On Wed, Jan 25, 2017 at 8:13 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> Though no hbase release has the hbase-spark module, you can find the
> backport patch on HBASE-14160 (for Spark 1.6)
>
> You can build the hbase-spark module yourself.
>
> Cheers
>
> On Wed, Jan 25, 2017 at 3:32 AM, Chetan Khatri <
> chetan.opensou...@gmail.com> wrote:
>
>> Hello Spark Community Folks,
>>
>> Currently I am using HBase 1.2.4 and Hive 1.2.1, I am looking for Bulk
>> Load from Hbase to Hive.
>>
>> I have seen couple of good example at HBase Github Repo:
>> https://github.com/apache/hbase/tree/master/hbase-spark
>>
>> If I would like to use HBaseContext with HBase 1.2.4, how it can be done
>> ? Or which version of HBase has more stability with HBaseContext ?
>>
>> Thanks.
>>
>
>


HBaseContext with Spark

2017-01-25 Thread Chetan Khatri
Hello Spark Community Folks,

Currently I am using HBase 1.2.4 and Hive 1.2.1, I am looking for Bulk Load
from Hbase to Hive.

I have seen couple of good example at HBase Github Repo: https://github.com/
apache/hbase/tree/master/hbase-spark

If I would like to use HBaseContext with HBase 1.2.4, how it can be done ?
Or which version of HBase has more stability with HBaseContext ?

Thanks.


Re: Weird experience Hive with Spark Transformations

2017-01-17 Thread Chetan Khatri
But Hive 1.2.1 do not have hive-site.xml, I tried to add my own which
causes me other several issues. On the other side it works well for me with
 Hive 2.0.1 where hive-site.xml content were as below and copied to
spark/conf too. it worked.

*5. hive-site.xml configuration setup*


Add below at conf/hive-site.xml , if not there then create it.




javax.jdo.option.ConnectionURL

jdbc:mysql://localhost/metastore?createDatabaseIfNotExist=true

metadata is stored in a MySQL server





javax.jdo.option.ConnectionDriverName

com.mysql.jdbc.Driver

MySQL JDBC driver class





javax.jdo.option.ConnectionUserName

hiveuser

user name for connecting to mysql server





javax.jdo.option.ConnectionPassword

hivepassword

password for connecting to mysql server




Replace below 3 properties tag with whatever already exist by default.
otherwise it will throw an error


"java.net.URISyntaxException: Relative path in absolute URI:
${system:java.io.tmpdir%7D/$%7Bsystem:user.name%7D"




hive.querylog.location

$HIVE_HOME/iotmp

Location of Hive run time structured log file






hive.exec.local.scratchdir

$HIVE_HOME/iotmp

Local scratch space for Hive jobs






hive.downloaded.resources.dir

$HIVE_HOME/iotmp

Temporary local directory for added resources in the remote
file system.





On Tue, Jan 17, 2017 at 10:01 PM, Dongjoon Hyun <dongj...@apache.org> wrote:

> Hi, Chetan.
>
> Did you copy your `hive-site.xml` into Spark conf directory? For example,
>
> cp /usr/local/hive/conf/hive-site.xml /usr/local/spark/conf
>
> If you want to use the existing Hive metastore, you need to provide that
> information to Spark.
>
> Bests,
> Dongjoon.
>
> On 2017-01-16 21:36 (-0800), Chetan Khatri <chetan.opensou...@gmail.com>
> wrote:
> > Hello,
> >
> > I have following services are configured and installed successfully:
> >
> > Hadoop 2.7.x
> > Spark 2.0.x
> > HBase 1.2.4
> > Hive 1.2.1
> >
> > *Installation Directories:*
> >
> > /usr/local/hadoop
> > /usr/local/spark
> > /usr/local/hbase
> >
> > *Hive Environment variables:*
> >
> > #HIVE VARIABLES START
> > export HIVE_HOME=/usr/local/hive
> > export PATH=$PATH:$HIVE_HOME/bin
> > #HIVE VARIABLES END
> >
> > So, I can access Hive from anywhere as environment variables are
> > configured. Now if if i start my spark-shell & hive from location
> > /usr/local/hive then both work good for hive-metastore other wise from
> > where i start spark-shell where spark creates own meta-store.
> >
> > i.e I am reading from HBase and Writing to Hive using Spark. I dont know
> > why this is weird issue is.
> >
> >
> >
> >
> > Thanks.
> >
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


Weird experience Hive with Spark Transformations

2017-01-16 Thread Chetan Khatri
Hello,

I have following services are configured and installed successfully:

Hadoop 2.7.x
Spark 2.0.x
HBase 1.2.4
Hive 1.2.1

*Installation Directories:*

/usr/local/hadoop
/usr/local/spark
/usr/local/hbase

*Hive Environment variables:*

#HIVE VARIABLES START
export HIVE_HOME=/usr/local/hive
export PATH=$PATH:$HIVE_HOME/bin
#HIVE VARIABLES END

So, I can access Hive from anywhere as environment variables are
configured. Now if if i start my spark-shell & hive from location
/usr/local/hive then both work good for hive-metastore other wise from
where i start spark-shell where spark creates own meta-store.

i.e I am reading from HBase and Writing to Hive using Spark. I dont know
why this is weird issue is.




Thanks.


Re: About saving DataFrame to Hive 1.2.1 with Spark 2.0.1

2017-01-16 Thread Chetan Khatri
va:498)
at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786)
at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1047)
at
scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:638)
at
scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:637)
at
scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31)
at
scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19)
at
scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:637)
at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:569)
at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:565)
at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:807)
at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:681)
at scala.tools.nsc.interpreter.ILoop.processLine(ILoop.scala:395)
at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:415)
at
scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:923)
at
scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)
at
scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)
at
scala.reflect.internal.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:97)
at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:909)
at org.apache.spark.repl.Main$.doMain(Main.scala:68)
at org.apache.spark.repl.Main$.main(Main.scala:51)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

stdDf: org.apache.spark.sql.DataFrame = [stid: string, name: string ... 3
more fields]

again same works without exception:

scala> import sqlContext.implicits._
import sqlContext.implicits._

scala> val stdDf = sqlContext.createDataFrame(rowRDD,empSchema.struct);
stdDf: org.apache.spark.sql.DataFrame = [stid: string, name: string ... 3
more fields]


Thanks.


On Tue, Jan 17, 2017 at 12:48 AM, Chetan Khatri <chetan.opensou...@gmail.com
> wrote:

> Hello Community,
>
> I am struggling to save Dataframe to Hive Table,
>
> Versions:
>
> Hive 1.2.1
> Spark 2.0.1
>
> *Working code:*
>
> /*
> @Author: Chetan Khatri
> /* @Author: Chetan Khatri Description: This Scala script has written for
> HBase to Hive module, which reads table from HBase and dump it out to Hive
> */ import it.nerdammer.spark.hbase._ import org.apache.spark.sql.Row import
> org.apache.spark.sql.types.StructType import 
> org.apache.spark.sql.types.StructField
> import org.apache.spark.sql.types.StringType import 
> org.apache.spark.sql.SparkSession
> // Approach 1: // Read HBase Table val hBaseRDD =
> sc.hbaseTable[(Option[String], Option[String], Option[String],
> Option[String], Option[String])]("university").select("stid",
> "name","subject","grade","city").inColumnFamily("emp") // Iterate
> HBaseRDD and generate RDD[Row] val rowRDD = hBaseRDD.map(i =>
> Row(i._1.get,i._2.get,i._3.get,i._4.get,i._5.get)) // Create sqlContext
> for createDataFrame method val sqlContext = new 
> org.apache.spark.sql.SQLContext(sc)
> // Create Schema Structure object empSchema { val stid =
> StructField("stid", StringType) val name = StructField("name", StringType)
> val subject = StructField("subject", StringType) val grade =
> StructField("grade", StringType) val city = StructField("city", StringType)
> val struct = StructType(Array(stid, name, subject, grade, city)) } import
> sqlContext.implicits._ // Create DataFrame with rowRDD and Schema structure
> val stdDf = sqlContext.createDataFrame(rowRDD,empSchema.struct); //
> Importing Hive import org.apache.spark.sql.hive // Enable Hive with Hive
> warehouse in SparkSession val spark = SparkSession.builder().appName("Spark
> Hive Example").config("spark.sql.warehouse.dir",
> "/usr/local/hive/warehouse/").enableHiveSupport().getOrCreate() // Saving
> Dataframe to Hive Table Successfully. 
> stdDf.write.mode("append").saveAsTable("employee")
> // Approach 2 : Where error comes import spark.implicits._

About saving DataFrame to Hive 1.2.1 with Spark 2.0.1

2017-01-16 Thread Chetan Khatri
Hello Community,

I am struggling to save Dataframe to Hive Table,

Versions:

Hive 1.2.1
Spark 2.0.1

*Working code:*

/*
@Author: Chetan Khatri
/* @Author: Chetan Khatri Description: This Scala script has written for
HBase to Hive module, which reads table from HBase and dump it out to Hive
*/ import it.nerdammer.spark.hbase._ import org.apache.spark.sql.Row import
org.apache.spark.sql.types.StructType import
org.apache.spark.sql.types.StructField import
org.apache.spark.sql.types.StringType import
org.apache.spark.sql.SparkSession // Approach 1: // Read HBase Table val
hBaseRDD = sc.hbaseTable[(Option[String], Option[String], Option[String],
Option[String], Option[String])]("university").select("stid",
"name","subject","grade","city").inColumnFamily("emp") // Iterate HBaseRDD
and generate RDD[Row] val rowRDD = hBaseRDD.map(i =>
Row(i._1.get,i._2.get,i._3.get,i._4.get,i._5.get)) // Create sqlContext for
createDataFrame method val sqlContext = new
org.apache.spark.sql.SQLContext(sc) // Create Schema Structure object
empSchema { val stid = StructField("stid", StringType) val name =
StructField("name", StringType) val subject = StructField("subject",
StringType) val grade = StructField("grade", StringType) val city =
StructField("city", StringType) val struct = StructType(Array(stid, name,
subject, grade, city)) } import sqlContext.implicits._ // Create DataFrame
with rowRDD and Schema structure val stdDf =
sqlContext.createDataFrame(rowRDD,empSchema.struct); // Importing Hive
import org.apache.spark.sql.hive // Enable Hive with Hive warehouse in
SparkSession val spark = SparkSession.builder().appName("Spark Hive
Example").config("spark.sql.warehouse.dir",
"/usr/local/hive/warehouse/").enableHiveSupport().getOrCreate() // Saving
Dataframe to Hive Table Successfully.
stdDf.write.mode("append").saveAsTable("employee") // Approach 2 : Where
error comes import spark.implicits._ import spark.sql sql("use default")
sql("create table employee(stid STRING, name STRING, subject STRING, grade
STRING, city STRING)") scala> sql("show TABLES").show()
+-+---+ |tableName|isTemporary| +-+---+ |
employee| false| +-+---+
stdDf.write.mode("append").saveAsTable("employee") ERROR Exception:
org.apache.spark.sql.AnalysisException: Saving data in MetastoreRelation
default, employee is not supported.; at
org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.run(createDataSourceTables.scala:221)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114) at
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:86)
at
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:86)
at
org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:378)
at
org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:354)
... 56 elided Questions: At Approach 1, It stores data where hive table is
not previously created, when i say saveAsTable it automatically creates for
me and next time it also appends data into that, How to store data in
previously created tables ?
It also gives warning WARN metastore.HiveMetaStore: Location:
file:/usr/local/spark/spark-warehouse/employee specified for non-external
table:employee but i have already provided path of HiveMetaStore then why
it is storing in spark's warehouse meta-store.

Hive-setup done with reference to:
http://mitu.co.in/wp-content/uploads/2015/12/Hive-Installation-on-Ubuntu-14.04-and-Hadoop-2.6.3.pdf
and it's working well, I could not change the Hive version, it must be 1.2.1

Thank you.


Re: Error at starting Phoenix shell with HBase

2017-01-15 Thread Chetan Khatri
Any updates for the above error guys ?


On Fri, Jan 13, 2017 at 9:35 PM, Josh Elser <els...@apache.org> wrote:

> (-cc dev@phoenix)
>
> phoenix-4.8.2-HBase-1.2-server.jar in the top-level binary tarball of
> Apache Phoenix 4.8.0 is the jar which is meant to be deployed to all
> HBase's classpath.
>
> I would check the RegionServer logs -- I'm guessing that it never started
> correctly or failed. The error message is saying that certain regions in
> the system were never assigned to a RegionServer which only happens in
> exceptional cases.
>
> Chetan Khatri wrote:
>
>> Hello Community,
>>
>> I have installed and configured Apache Phoenix on Single Node Ubuntu 16.04
>> machine:
>> - Hadoop 2.7
>> - HBase 1.2.4
>> - Phoenix -4.8.2-HBase-1.2
>>
>> Copied phoenix-core-4.8.2-HBase-1.2.jar to hbase/lib and confirmed
>> with bin/hbase classpath | grep 'phoenix' and I am using embedded
>> zookeeper, so my hbase-site.xml looks like below:
>>
>> 
>>   
>>  hbase.rootdir
>>  file:///home/hduser/hbase
>>
>> 
>>
>> I am able to read / write to HBase from shell and Apache Spark.
>>
>> *Errors while accessing with **sqlline**:*
>>
>>
>> 1) bin/sqlline.py localhost:2181
>>
>> Error:
>>
>> 1. Command made process hang.
>> 2.
>> Error: ERROR 1102 (XCL02): Cannot get all table regions.
>> (state=XCL02,code=1102)
>> java.sql.SQLException: ERROR 1102 (XCL02): Cannot get all table regions.
>> at
>> org.apache.phoenix.exception.SQLExceptionCode$Factory$1.newE
>> xception(SQLExceptionCode.java:455)
>> at
>> org.apache.phoenix.exception.SQLExceptionInfo.buildException
>> (SQLExceptionInfo.java:145)
>> at
>> org.apache.phoenix.query.ConnectionQueryServicesImpl.getAllT
>> ableRegions(ConnectionQueryServicesImpl.java:546)
>> at
>> org.apache.phoenix.query.ConnectionQueryServicesImpl.checkCl
>> ientServerCompatibility(ConnectionQueryServicesImpl.java:1162)
>> at
>> org.apache.phoenix.query.ConnectionQueryServicesImpl.ensureT
>> ableCreated(ConnectionQueryServicesImpl.java:1068)
>> at
>> org.apache.phoenix.query.ConnectionQueryServicesImpl.createT
>> able(ConnectionQueryServicesImpl.java:1388)
>> at
>> org.apache.phoenix.schema.MetaDataClient.createTableInternal
>> (MetaDataClient.java:2298)
>> at
>> org.apache.phoenix.schema.MetaDataClient.createTable(MetaDat
>> aClient.java:940)
>> at
>> org.apache.phoenix.compile.CreateTableCompiler$2.execute(Cre
>> ateTableCompiler.java:193)
>> at
>> org.apache.phoenix.jdbc.PhoenixStatement$2.call(PhoenixState
>> ment.java:344)
>> at
>> org.apache.phoenix.jdbc.PhoenixStatement$2.call(PhoenixState
>> ment.java:332)
>> at org.apache.phoenix.call.CallRunner.run(CallRunner.java:53)
>> at
>> org.apache.phoenix.jdbc.PhoenixStatement.executeMutation(Pho
>> enixStatement.java:331)
>> at
>> org.apache.phoenix.jdbc.PhoenixStatement.executeUpdate(Phoen
>> ixStatement.java:1423)
>> at
>> org.apache.phoenix.query.ConnectionQueryServicesImpl$13.
>> call(ConnectionQueryServicesImpl.java:2352)
>> at
>> org.apache.phoenix.query.ConnectionQueryServicesImpl$13.
>> call(ConnectionQueryServicesImpl.java:2291)
>> at
>> org.apache.phoenix.util.PhoenixContextExecutor.call(PhoenixC
>> ontextExecutor.java:76)
>> at
>> org.apache.phoenix.query.ConnectionQueryServicesImpl.init(Co
>> nnectionQueryServicesImpl.java:2291)
>> at
>> org.apache.phoenix.jdbc.PhoenixDriver.getConnectionQueryServ
>> ices(PhoenixDriver.java:232)
>> at
>> org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.createConnecti
>> on(PhoenixEmbeddedDriver.java:147)
>> at org.apache.phoenix.jdbc.PhoenixDriver.connect(PhoenixDriver.java:202)
>> at sqlline.DatabaseConnection.connect(DatabaseConnection.java:157)
>> at sqlline.DatabaseConnection.getConnection(DatabaseConnection.java:203)
>> at sqlline.Commands.connect(Commands.java:1064)
>> at sqlline.Commands.connect(Commands.java:996)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>> ssorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>> thodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> sqlline.ReflectiveCommandHandler.execute(ReflectiveCommandHa
>> ndler.java:36)
>> at sqlline.SqlLine.dispatch(SqlLine.java:803)
>> at sqlline.SqlLine.initArgs(SqlLi

Re: Approach: Incremental data load from HBASE

2017-01-06 Thread Chetan Khatri
Hi Ayan,

I mean by Incremental load from HBase, weekly running batch jobs takes rows
from HBase table and dump it out to Hive. Now when next i run Job it only
takes newly arrived jobs.

Same as if we use Sqoop for incremental load from RDBMS to Hive with below
command,

sqoop job --create myssb1 -- import --connect
jdbc:mysql://:/sakila --username admin --password admin
--driver=com.mysql.jdbc.Driver --query "SELECT address_id, address,
district, city_id, postal_code, alast_update, cityid, city, country_id,
clast_update FROM(SELECT a.address_id as address_id, a.address as address,
a.district as district, a.city_id as city_id, a.postal_code as postal_code,
a.last_update as alast_update, c.city_id as cityid, c.city as city,
c.country_id as country_id, c.last_update as clast_update FROM
sakila.address a INNER JOIN sakila.city c ON a.city_id=c.city_id) as sub
WHERE $CONDITIONS" --incremental lastmodified --check-column alast_update
--last-value 1900-01-01 --target-dir /user/cloudera/ssb7 --hive-import
--hive-table test.sakila -m 1 --hive-drop-import-delims --map-column-java
address=String

Probably i am looking for any tool from HBase incubator family which does
the job for me, or other alternative approaches can be done through reading
Hbase tables in RDD and saving RDD to Hive.

Thanks.


On Thu, Jan 5, 2017 at 2:02 AM, ayan guha <guha.a...@gmail.com> wrote:

> Hi Chetan
>
> What do you mean by incremental load from HBase? There is a timestamp
> marker for each cell, but not at Row level.
>
> On Wed, Jan 4, 2017 at 10:37 PM, Chetan Khatri <
> chetan.opensou...@gmail.com> wrote:
>
>> Ted Yu,
>>
>> You understood wrong, i said Incremental load from HBase to Hive,
>> individually you can say Incremental Import from HBase.
>>
>> On Wed, Dec 21, 2016 at 10:04 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>
>>> Incremental load traditionally means generating hfiles and
>>> using org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles to load
>>> the data into hbase.
>>>
>>> For your use case, the producer needs to find rows where the flag is 0
>>> or 1.
>>> After such rows are obtained, it is up to you how the result of
>>> processing is delivered to hbase.
>>>
>>> Cheers
>>>
>>> On Wed, Dec 21, 2016 at 8:00 AM, Chetan Khatri <
>>> chetan.opensou...@gmail.com> wrote:
>>>
>>>> Ok, Sure will ask.
>>>>
>>>> But what would be generic best practice solution for Incremental load
>>>> from HBASE.
>>>>
>>>> On Wed, Dec 21, 2016 at 8:42 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>
>>>>> I haven't used Gobblin.
>>>>> You can consider asking Gobblin mailing list of the first option.
>>>>>
>>>>> The second option would work.
>>>>>
>>>>>
>>>>> On Wed, Dec 21, 2016 at 2:28 AM, Chetan Khatri <
>>>>> chetan.opensou...@gmail.com> wrote:
>>>>>
>>>>>> Hello Guys,
>>>>>>
>>>>>> I would like to understand different approach for Distributed
>>>>>> Incremental load from HBase, Is there any *tool / incubactor tool* which
>>>>>> satisfy requirement ?
>>>>>>
>>>>>> *Approach 1:*
>>>>>>
>>>>>> Write Kafka Producer and maintain manually column flag for events and
>>>>>> ingest it with Linkedin Gobblin to HDFS / S3.
>>>>>>
>>>>>> *Approach 2:*
>>>>>>
>>>>>> Run Scheduled Spark Job - Read from HBase and do transformations and
>>>>>> maintain flag column at HBase Level.
>>>>>>
>>>>>> In above both approach, I need to maintain column level flags. such
>>>>>> as 0 - by default, 1-sent,2-sent and acknowledged. So next time Producer
>>>>>> will take another 1000 rows of batch where flag is 0 or 1.
>>>>>>
>>>>>> I am looking for best practice approach with any distributed tool.
>>>>>>
>>>>>> Thanks.
>>>>>>
>>>>>> - Chetan Khatri
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>


Re: Approach: Incremental data load from HBASE

2017-01-04 Thread Chetan Khatri
Ted Yu,

You understood wrong, i said Incremental load from HBase to Hive,
individually you can say Incremental Import from HBase.

On Wed, Dec 21, 2016 at 10:04 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> Incremental load traditionally means generating hfiles and
> using org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles to load the
> data into hbase.
>
> For your use case, the producer needs to find rows where the flag is 0 or
> 1.
> After such rows are obtained, it is up to you how the result of processing
> is delivered to hbase.
>
> Cheers
>
> On Wed, Dec 21, 2016 at 8:00 AM, Chetan Khatri <
> chetan.opensou...@gmail.com> wrote:
>
>> Ok, Sure will ask.
>>
>> But what would be generic best practice solution for Incremental load
>> from HBASE.
>>
>> On Wed, Dec 21, 2016 at 8:42 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>
>>> I haven't used Gobblin.
>>> You can consider asking Gobblin mailing list of the first option.
>>>
>>> The second option would work.
>>>
>>>
>>> On Wed, Dec 21, 2016 at 2:28 AM, Chetan Khatri <
>>> chetan.opensou...@gmail.com> wrote:
>>>
>>>> Hello Guys,
>>>>
>>>> I would like to understand different approach for Distributed
>>>> Incremental load from HBase, Is there any *tool / incubactor tool* which
>>>> satisfy requirement ?
>>>>
>>>> *Approach 1:*
>>>>
>>>> Write Kafka Producer and maintain manually column flag for events and
>>>> ingest it with Linkedin Gobblin to HDFS / S3.
>>>>
>>>> *Approach 2:*
>>>>
>>>> Run Scheduled Spark Job - Read from HBase and do transformations and
>>>> maintain flag column at HBase Level.
>>>>
>>>> In above both approach, I need to maintain column level flags. such as
>>>> 0 - by default, 1-sent,2-sent and acknowledged. So next time Producer will
>>>> take another 1000 rows of batch where flag is 0 or 1.
>>>>
>>>> I am looking for best practice approach with any distributed tool.
>>>>
>>>> Thanks.
>>>>
>>>> - Chetan Khatri
>>>>
>>>
>>>
>>
>


Re: Apache Hive with Spark Configuration

2017-01-04 Thread Chetan Khatri
Ryan,

I agree that Hive 1.2.1 work reliably with Spark 2.x , but i went through
with current stable version of Hive which is 2.0.1 and I am working with
that. seems good but i want to make sure the which version of Hive is more
reliable with Spark 2.x and i think @Ryan you replied the same which is
hive 1.2.1 .

Thanks.



On Wed, Jan 4, 2017 at 2:02 AM, Ryan Blue <rb...@netflix.com> wrote:

> Chetan,
>
> Spark is currently using Hive 1.2.1 to interact with the Metastore. Using
> that version for Hive is going to be the most reliable, but the metastore
> API doesn't change very often and we've found (from having different
> versions as well) that older versions are mostly compatible. Some things
> fail occasionally, but we haven't had too many problems running different
> versions with the same metastore in practice.
>
> rb
>
> On Wed, Dec 28, 2016 at 4:22 AM, Chetan Khatri <
> chetan.opensou...@gmail.com> wrote:
>
>> Hello Users / Developers,
>>
>> I am using Hive 2.0.1 with MySql as a Metastore, can you tell me which
>> version is more compatible with Spark 2.0.2 ?
>>
>> THanks
>>
>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>


Re: Error: at sqlContext.createDataFrame with RDD and Schema

2016-12-28 Thread Chetan Khatri
Resolved above error by creating SparkSession

val spark = SparkSession.builder().appName("Hbase - Spark
POC").getOrCreate()

Error after:

spark.sql("SELECT * FROM student").show()

But while doing show() action on Dataframe throws below error:

scala> sqlContext.sql("select * from student").show()
16/12/28 21:04:23 ERROR executor.Executor: Exception in task 0.0 in stage
2.0 (TID 2)
java.lang.RuntimeException: Error while encoding:
java.lang.RuntimeException: java.lang.Integer is not a valid external type
for schema of string
if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row
object).isNullAt) null else staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
validateexternaltype(getexternalrowfield(assertnotnull(input[0,
org.apache.spark.sql.Row, true], top level row object), 0, Rowid),
StringType), true) AS Rowid#35
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level
row object).isNullAt) null else staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
validateexternaltype(getexternalrowfield(assertnotnull(input[0,
org.apache.spark.sql.Row, true], top level row object), 0, Rowid),
StringType), true)
   :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row
object).isNullAt
   :  :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level
row object)
   :  :  +- input[0, org.apache.spark.sql.Row, true]
   :  +- 0
   :- null
   +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String,
StringType, fromString,
validateexternaltype(getexternalrowfield(assertnotnull(input[0,
org.apache.spark.sql.Row, true], top level row object), 0, Rowid),
StringType), true)
  +- validateexternaltype(getexternalrowfield(assertnotnull(input[0,
org.apache.spark.sql.Row, true], top level row object), 0, Rowid),
StringType)
 +- getexternalrowfield(assertnotnull(input[0,
org.apache.spark.sql.Row, true], top level row object), 0, Rowid)
+- assertnotnull(input[0, org.apache.spark.sql.Row, true], top
level row object)
   +- input[0, org.apache.spark.sql.Row, true]

if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row
object).isNullAt) null else staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
validateexternaltype(getexternalrowfield(assertnotnull(input[0,
org.apache.spark.sql.Row, true], top level row object), 1, maths),
StringType), true) AS maths#36
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level
row object).isNullAt) null else staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
validateexternaltype(getexternalrowfield(assertnotnull(input[0,
org.apache.spark.sql.Row, true], top level row object), 1, maths),
StringType), true)
   :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row
object).isNullAt
   :  :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level
row object)
   :  :  +- input[0, org.apache.spark.sql.Row, true]
   :  +- 1
   :- null
   +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String,
StringType, fromString,
validateexternaltype(getexternalrowfield(assertnotnull(input[0,
org.apache.spark.sql.Row, true], top level row object), 1, maths),
StringType), true)
  +- validateexternaltype(getexternalrowfield(assertnotnull(input[0,
org.apache.spark.sql.Row, true], top level row object), 1, maths),
StringType)
 +- getexternalrowfield(assertnotnull(input[0,
org.apache.spark.sql.Row, true], top level row object), 1, maths)
+- assertnotnull(input[0, org.apache.spark.sql.Row, true], top
level row object)
   +- input[0, org.apache.spark.sql.Row, true]

if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row
object).isNullAt) null else staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
validateexternaltype(getexternalrowfield(assertnotnull(input[0,
org.apache.spark.sql.Row, true], top level row object), 2, english),
StringType), true) AS english#37
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level
row object).isNullAt) null else staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
validateexternaltype(getexternalrowfield(assertnotnull(input[0,
org.apache.spark.sql.Row, true], top level row object), 2, english),
StringType), true)

Kindly help, unable to check with error that what exactly is.

Thanks.,


On Wed, Dec 28, 2016 at 9:00 PM, Chetan Khatri <chetan.opensou...@gmail.com>
wrote:

> Hello Spark Community,
>
> I am reading HBase table from Spark and getting RDD but now i wants to
> convert RDD of Spark Rows and want to convert to DF.
>
> *Source Code:*
>
> bin/spark-shell --packages 
> it.nerdammer.bigdata:spark-hbase-connector_2.10:1.0.3
> --conf spark.hbase.host=127.0.0.1
>
> import it

Apache Hive with Spark Configuration

2016-12-28 Thread Chetan Khatri
Hello Users / Developers,

I am using Hive 2.0.1 with MySql as a Metastore, can you tell me which
version is more compatible with Spark 2.0.2 ?

THanks


Re: Negative number of active tasks

2016-12-23 Thread Chetan Khatri
Could you share Pseudo code for the same.

Cheers!

C Khatri.

On Fri, Dec 23, 2016 at 4:33 PM, Andy Dang  wrote:

> Hi all,
>
> Today I hit a weird bug in Spark 2.0.2 (vanilla Spark) - the executor tab
> shows negative number of active tasks.
>
> I have about 25 jobs, each with 20k tasks so the numbers are not that
> crazy.
>
> What could possibly the cause of this bug? This is the first time I've
> seen it and the only special thing I'm doing is saving multiple datasets at
> the same time to HDFS from different threads.
>
> Thanks,
> Andy
>
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>


Re: Approach: Incremental data load from HBASE

2016-12-23 Thread Chetan Khatri
Ted Correct, In my case i want Incremental Import from HBASE and
Incremental load to Hive. Both approach discussed earlier with Indexing
seems accurate to me. But like Sqoop support Incremental import and load
for RDBMS, Is there any tool which supports Incremental import from HBase ?



On Wed, Dec 21, 2016 at 10:04 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> Incremental load traditionally means generating hfiles and
> using org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles to load the
> data into hbase.
>
> For your use case, the producer needs to find rows where the flag is 0 or
> 1.
> After such rows are obtained, it is up to you how the result of processing
> is delivered to hbase.
>
> Cheers
>
> On Wed, Dec 21, 2016 at 8:00 AM, Chetan Khatri <
> chetan.opensou...@gmail.com> wrote:
>
>> Ok, Sure will ask.
>>
>> But what would be generic best practice solution for Incremental load
>> from HBASE.
>>
>> On Wed, Dec 21, 2016 at 8:42 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>
>>> I haven't used Gobblin.
>>> You can consider asking Gobblin mailing list of the first option.
>>>
>>> The second option would work.
>>>
>>>
>>> On Wed, Dec 21, 2016 at 2:28 AM, Chetan Khatri <
>>> chetan.opensou...@gmail.com> wrote:
>>>
>>>> Hello Guys,
>>>>
>>>> I would like to understand different approach for Distributed
>>>> Incremental load from HBase, Is there any *tool / incubactor tool* which
>>>> satisfy requirement ?
>>>>
>>>> *Approach 1:*
>>>>
>>>> Write Kafka Producer and maintain manually column flag for events and
>>>> ingest it with Linkedin Gobblin to HDFS / S3.
>>>>
>>>> *Approach 2:*
>>>>
>>>> Run Scheduled Spark Job - Read from HBase and do transformations and
>>>> maintain flag column at HBase Level.
>>>>
>>>> In above both approach, I need to maintain column level flags. such as
>>>> 0 - by default, 1-sent,2-sent and acknowledged. So next time Producer will
>>>> take another 1000 rows of batch where flag is 0 or 1.
>>>>
>>>> I am looking for best practice approach with any distributed tool.
>>>>
>>>> Thanks.
>>>>
>>>> - Chetan Khatri
>>>>
>>>
>>>
>>
>


Re: Best Practice for Spark Job Jar Generation

2016-12-23 Thread Chetan Khatri
Correct, so the approach you suggested and Uber Jar Approach. What i think
that Uber Jar approach is best practice because if you wish to do
environment migration then would be easy. and Performance wise also Uber
Jar Approach would be more optimised rather than Uber less approach.

Thanks.

On Fri, Dec 23, 2016 at 11:41 PM, Andy Dang <nam...@gmail.com> wrote:

> We remodel Spark dependencies and ours together and chuck them under the
> /jars path. There are other ways to do it but we want the classpath to be
> strictly as close to development as possible.
>
> ---
> Regards,
> Andy
>
> On Fri, Dec 23, 2016 at 6:00 PM, Chetan Khatri <
> chetan.opensou...@gmail.com> wrote:
>
>> Andy, Thanks for reply.
>>
>> If we download all the dependencies at separate location  and link with
>> spark job jar on spark cluster, is it best way to execute spark job ?
>>
>> Thanks.
>>
>> On Fri, Dec 23, 2016 at 8:34 PM, Andy Dang <nam...@gmail.com> wrote:
>>
>>> I used to use uber jar in Spark 1.x because of classpath issues (we
>>> couldn't re-model our dependencies based on our code, and thus cluster's
>>> run dependencies could be very different from running Spark directly in the
>>> IDE. We had to use userClasspathFirst "hack" to work around this.
>>>
>>> With Spark 2, it's easier to replace dependencies (say, Guava) than
>>> before. We moved away from deploying superjar and just pass the libraries
>>> as part of Spark jars (still can't use Guava v19 or later because Spark
>>> uses a deprecated method that's not available, but that's not a big issue
>>> for us).
>>>
>>> ---
>>> Regards,
>>> Andy
>>>
>>> On Fri, Dec 23, 2016 at 6:44 AM, Chetan Khatri <
>>> chetan.opensou...@gmail.com> wrote:
>>>
>>>> Hello Spark Community,
>>>>
>>>> For Spark Job Creation I use SBT Assembly to build Uber("Super") Jar
>>>> and then submit to spark-submit.
>>>>
>>>> Example,
>>>>
>>>> bin/spark-submit --class hbase.spark.chetan.com.SparkHbaseJob
>>>> /home/chetan/hbase-spark/SparkMSAPoc-assembly-1.0.jar
>>>>
>>>> But other folks has debate with for Uber Less Jar, Guys can you please
>>>> explain me best practice industry standard for the same.
>>>>
>>>> Thanks,
>>>>
>>>> Chetan Khatri.
>>>>
>>>
>>>
>>
>


Dependency Injection and Microservice development with Spark

2016-12-23 Thread Chetan Khatri
Hello Community,

Current approach I am using for Spark Job Development with Scala + SBT and
Uber Jar with yml properties file to pass configuration parameters. But If
i would like to use Dependency Injection and MicroService Development like
Spring Boot feature in Scala then what would be the standard approach.

Thanks

Chetan


Best Practice for Spark Job Jar Generation

2016-12-22 Thread Chetan Khatri
Hello Spark Community,

For Spark Job Creation I use SBT Assembly to build Uber("Super") Jar and
then submit to spark-submit.

Example,

bin/spark-submit --class hbase.spark.chetan.com.SparkHbaseJob
/home/chetan/hbase-spark/SparkMSAPoc-assembly-1.0.jar

But other folks has debate with for Uber Less Jar, Guys can you please
explain me best practice industry standard for the same.

Thanks,

Chetan Khatri.


Re: Approach: Incremental data load from HBASE

2016-12-21 Thread Chetan Khatri
Ok, Sure will ask.

But what would be generic best practice solution for Incremental load from
HBASE.

On Wed, Dec 21, 2016 at 8:42 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> I haven't used Gobblin.
> You can consider asking Gobblin mailing list of the first option.
>
> The second option would work.
>
>
> On Wed, Dec 21, 2016 at 2:28 AM, Chetan Khatri <
> chetan.opensou...@gmail.com> wrote:
>
>> Hello Guys,
>>
>> I would like to understand different approach for Distributed Incremental
>> load from HBase, Is there any *tool / incubactor tool* which satisfy
>> requirement ?
>>
>> *Approach 1:*
>>
>> Write Kafka Producer and maintain manually column flag for events and
>> ingest it with Linkedin Gobblin to HDFS / S3.
>>
>> *Approach 2:*
>>
>> Run Scheduled Spark Job - Read from HBase and do transformations and
>> maintain flag column at HBase Level.
>>
>> In above both approach, I need to maintain column level flags. such as 0
>> - by default, 1-sent,2-sent and acknowledged. So next time Producer will
>> take another 1000 rows of batch where flag is 0 or 1.
>>
>> I am looking for best practice approach with any distributed tool.
>>
>> Thanks.
>>
>> - Chetan Khatri
>>
>
>


Approach: Incremental data load from HBASE

2016-12-21 Thread Chetan Khatri
Hello Guys,

I would like to understand different approach for Distributed Incremental
load from HBase, Is there any *tool / incubactor tool* which satisfy
requirement ?

*Approach 1:*

Write Kafka Producer and maintain manually column flag for events and
ingest it with Linkedin Gobblin to HDFS / S3.

*Approach 2:*

Run Scheduled Spark Job - Read from HBase and do transformations and
maintain flag column at HBase Level.

In above both approach, I need to maintain column level flags. such as 0 -
by default, 1-sent,2-sent and acknowledged. So next time Producer will take
another 1000 rows of batch where flag is 0 or 1.

I am looking for best practice approach with any distributed tool.

Thanks.

- Chetan Khatri