http://git-wip-us.apache.org/repos/asf/hive/blob/30fd19f4/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py 
b/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
index 2d308c9..8decc94 100644
--- a/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
+++ b/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
@@ -3122,6 +3122,8 @@ class Table:
    - tableType
    - privileges
    - temporary
+   - mmNextWriteId
+   - mmWatermarkWriteId
   """
 
   thrift_spec = (
@@ -3140,9 +3142,11 @@ class Table:
     (12, TType.STRING, 'tableType', None, None, ), # 12
     (13, TType.STRUCT, 'privileges', (PrincipalPrivilegeSet, 
PrincipalPrivilegeSet.thrift_spec), None, ), # 13
     (14, TType.BOOL, 'temporary', None, False, ), # 14
+    (15, TType.I64, 'mmNextWriteId', None, None, ), # 15
+    (16, TType.I64, 'mmWatermarkWriteId', None, None, ), # 16
   )
 
-  def __init__(self, tableName=None, dbName=None, owner=None, createTime=None, 
lastAccessTime=None, retention=None, sd=None, partitionKeys=None, 
parameters=None, viewOriginalText=None, viewExpandedText=None, tableType=None, 
privileges=None, temporary=thrift_spec[14][4],):
+  def __init__(self, tableName=None, dbName=None, owner=None, createTime=None, 
lastAccessTime=None, retention=None, sd=None, partitionKeys=None, 
parameters=None, viewOriginalText=None, viewExpandedText=None, tableType=None, 
privileges=None, temporary=thrift_spec[14][4], mmNextWriteId=None, 
mmWatermarkWriteId=None,):
     self.tableName = tableName
     self.dbName = dbName
     self.owner = owner
@@ -3157,6 +3161,8 @@ class Table:
     self.tableType = tableType
     self.privileges = privileges
     self.temporary = temporary
+    self.mmNextWriteId = mmNextWriteId
+    self.mmWatermarkWriteId = mmWatermarkWriteId
 
   def read(self, iprot):
     if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and 
isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is 
not None and fastbinary is not None:
@@ -3251,6 +3257,16 @@ class Table:
           self.temporary = iprot.readBool()
         else:
           iprot.skip(ftype)
+      elif fid == 15:
+        if ftype == TType.I64:
+          self.mmNextWriteId = iprot.readI64()
+        else:
+          iprot.skip(ftype)
+      elif fid == 16:
+        if ftype == TType.I64:
+          self.mmWatermarkWriteId = iprot.readI64()
+        else:
+          iprot.skip(ftype)
       else:
         iprot.skip(ftype)
       iprot.readFieldEnd()
@@ -3324,6 +3340,14 @@ class Table:
       oprot.writeFieldBegin('temporary', TType.BOOL, 14)
       oprot.writeBool(self.temporary)
       oprot.writeFieldEnd()
+    if self.mmNextWriteId is not None:
+      oprot.writeFieldBegin('mmNextWriteId', TType.I64, 15)
+      oprot.writeI64(self.mmNextWriteId)
+      oprot.writeFieldEnd()
+    if self.mmWatermarkWriteId is not None:
+      oprot.writeFieldBegin('mmWatermarkWriteId', TType.I64, 16)
+      oprot.writeI64(self.mmWatermarkWriteId)
+      oprot.writeFieldEnd()
     oprot.writeFieldStop()
     oprot.writeStructEnd()
 
@@ -3347,6 +3371,8 @@ class Table:
     value = (value * 31) ^ hash(self.tableType)
     value = (value * 31) ^ hash(self.privileges)
     value = (value * 31) ^ hash(self.temporary)
+    value = (value * 31) ^ hash(self.mmNextWriteId)
+    value = (value * 31) ^ hash(self.mmWatermarkWriteId)
     return value
 
   def __repr__(self):
@@ -12191,6 +12217,456 @@ class CacheFileMetadataRequest:
   def __ne__(self, other):
     return not (self == other)
 
+class GetNextWriteIdRequest:
+  """
+  Attributes:
+   - dbName
+   - tblName
+  """
+
+  thrift_spec = (
+    None, # 0
+    (1, TType.STRING, 'dbName', None, None, ), # 1
+    (2, TType.STRING, 'tblName', None, None, ), # 2
+  )
+
+  def __init__(self, dbName=None, tblName=None,):
+    self.dbName = dbName
+    self.tblName = tblName
+
+  def read(self, iprot):
+    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and 
isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is 
not None and fastbinary is not None:
+      fastbinary.decode_binary(self, iprot.trans, (self.__class__, 
self.thrift_spec))
+      return
+    iprot.readStructBegin()
+    while True:
+      (fname, ftype, fid) = iprot.readFieldBegin()
+      if ftype == TType.STOP:
+        break
+      if fid == 1:
+        if ftype == TType.STRING:
+          self.dbName = iprot.readString()
+        else:
+          iprot.skip(ftype)
+      elif fid == 2:
+        if ftype == TType.STRING:
+          self.tblName = iprot.readString()
+        else:
+          iprot.skip(ftype)
+      else:
+        iprot.skip(ftype)
+      iprot.readFieldEnd()
+    iprot.readStructEnd()
+
+  def write(self, oprot):
+    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and 
self.thrift_spec is not None and fastbinary is not None:
+      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, 
self.thrift_spec)))
+      return
+    oprot.writeStructBegin('GetNextWriteIdRequest')
+    if self.dbName is not None:
+      oprot.writeFieldBegin('dbName', TType.STRING, 1)
+      oprot.writeString(self.dbName)
+      oprot.writeFieldEnd()
+    if self.tblName is not None:
+      oprot.writeFieldBegin('tblName', TType.STRING, 2)
+      oprot.writeString(self.tblName)
+      oprot.writeFieldEnd()
+    oprot.writeFieldStop()
+    oprot.writeStructEnd()
+
+  def validate(self):
+    if self.dbName is None:
+      raise TProtocol.TProtocolException(message='Required field dbName is 
unset!')
+    if self.tblName is None:
+      raise TProtocol.TProtocolException(message='Required field tblName is 
unset!')
+    return
+
+
+  def __hash__(self):
+    value = 17
+    value = (value * 31) ^ hash(self.dbName)
+    value = (value * 31) ^ hash(self.tblName)
+    return value
+
+  def __repr__(self):
+    L = ['%s=%r' % (key, value)
+      for key, value in self.__dict__.iteritems()]
+    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+  def __eq__(self, other):
+    return isinstance(other, self.__class__) and self.__dict__ == 
other.__dict__
+
+  def __ne__(self, other):
+    return not (self == other)
+
+class GetNextWriteIdResult:
+  """
+  Attributes:
+   - writeId
+  """
+
+  thrift_spec = (
+    None, # 0
+    (1, TType.I64, 'writeId', None, None, ), # 1
+  )
+
+  def __init__(self, writeId=None,):
+    self.writeId = writeId
+
+  def read(self, iprot):
+    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and 
isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is 
not None and fastbinary is not None:
+      fastbinary.decode_binary(self, iprot.trans, (self.__class__, 
self.thrift_spec))
+      return
+    iprot.readStructBegin()
+    while True:
+      (fname, ftype, fid) = iprot.readFieldBegin()
+      if ftype == TType.STOP:
+        break
+      if fid == 1:
+        if ftype == TType.I64:
+          self.writeId = iprot.readI64()
+        else:
+          iprot.skip(ftype)
+      else:
+        iprot.skip(ftype)
+      iprot.readFieldEnd()
+    iprot.readStructEnd()
+
+  def write(self, oprot):
+    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and 
self.thrift_spec is not None and fastbinary is not None:
+      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, 
self.thrift_spec)))
+      return
+    oprot.writeStructBegin('GetNextWriteIdResult')
+    if self.writeId is not None:
+      oprot.writeFieldBegin('writeId', TType.I64, 1)
+      oprot.writeI64(self.writeId)
+      oprot.writeFieldEnd()
+    oprot.writeFieldStop()
+    oprot.writeStructEnd()
+
+  def validate(self):
+    if self.writeId is None:
+      raise TProtocol.TProtocolException(message='Required field writeId is 
unset!')
+    return
+
+
+  def __hash__(self):
+    value = 17
+    value = (value * 31) ^ hash(self.writeId)
+    return value
+
+  def __repr__(self):
+    L = ['%s=%r' % (key, value)
+      for key, value in self.__dict__.iteritems()]
+    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+  def __eq__(self, other):
+    return isinstance(other, self.__class__) and self.__dict__ == 
other.__dict__
+
+  def __ne__(self, other):
+    return not (self == other)
+
+class FinalizeWriteIdRequest:
+  """
+  Attributes:
+   - dbName
+   - tblName
+   - writeId
+   - commit
+  """
+
+  thrift_spec = (
+    None, # 0
+    (1, TType.STRING, 'dbName', None, None, ), # 1
+    (2, TType.STRING, 'tblName', None, None, ), # 2
+    (3, TType.I64, 'writeId', None, None, ), # 3
+    (4, TType.BOOL, 'commit', None, None, ), # 4
+  )
+
+  def __init__(self, dbName=None, tblName=None, writeId=None, commit=None,):
+    self.dbName = dbName
+    self.tblName = tblName
+    self.writeId = writeId
+    self.commit = commit
+
+  def read(self, iprot):
+    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and 
isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is 
not None and fastbinary is not None:
+      fastbinary.decode_binary(self, iprot.trans, (self.__class__, 
self.thrift_spec))
+      return
+    iprot.readStructBegin()
+    while True:
+      (fname, ftype, fid) = iprot.readFieldBegin()
+      if ftype == TType.STOP:
+        break
+      if fid == 1:
+        if ftype == TType.STRING:
+          self.dbName = iprot.readString()
+        else:
+          iprot.skip(ftype)
+      elif fid == 2:
+        if ftype == TType.STRING:
+          self.tblName = iprot.readString()
+        else:
+          iprot.skip(ftype)
+      elif fid == 3:
+        if ftype == TType.I64:
+          self.writeId = iprot.readI64()
+        else:
+          iprot.skip(ftype)
+      elif fid == 4:
+        if ftype == TType.BOOL:
+          self.commit = iprot.readBool()
+        else:
+          iprot.skip(ftype)
+      else:
+        iprot.skip(ftype)
+      iprot.readFieldEnd()
+    iprot.readStructEnd()
+
+  def write(self, oprot):
+    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and 
self.thrift_spec is not None and fastbinary is not None:
+      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, 
self.thrift_spec)))
+      return
+    oprot.writeStructBegin('FinalizeWriteIdRequest')
+    if self.dbName is not None:
+      oprot.writeFieldBegin('dbName', TType.STRING, 1)
+      oprot.writeString(self.dbName)
+      oprot.writeFieldEnd()
+    if self.tblName is not None:
+      oprot.writeFieldBegin('tblName', TType.STRING, 2)
+      oprot.writeString(self.tblName)
+      oprot.writeFieldEnd()
+    if self.writeId is not None:
+      oprot.writeFieldBegin('writeId', TType.I64, 3)
+      oprot.writeI64(self.writeId)
+      oprot.writeFieldEnd()
+    if self.commit is not None:
+      oprot.writeFieldBegin('commit', TType.BOOL, 4)
+      oprot.writeBool(self.commit)
+      oprot.writeFieldEnd()
+    oprot.writeFieldStop()
+    oprot.writeStructEnd()
+
+  def validate(self):
+    if self.dbName is None:
+      raise TProtocol.TProtocolException(message='Required field dbName is 
unset!')
+    if self.tblName is None:
+      raise TProtocol.TProtocolException(message='Required field tblName is 
unset!')
+    if self.writeId is None:
+      raise TProtocol.TProtocolException(message='Required field writeId is 
unset!')
+    if self.commit is None:
+      raise TProtocol.TProtocolException(message='Required field commit is 
unset!')
+    return
+
+
+  def __hash__(self):
+    value = 17
+    value = (value * 31) ^ hash(self.dbName)
+    value = (value * 31) ^ hash(self.tblName)
+    value = (value * 31) ^ hash(self.writeId)
+    value = (value * 31) ^ hash(self.commit)
+    return value
+
+  def __repr__(self):
+    L = ['%s=%r' % (key, value)
+      for key, value in self.__dict__.iteritems()]
+    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+  def __eq__(self, other):
+    return isinstance(other, self.__class__) and self.__dict__ == 
other.__dict__
+
+  def __ne__(self, other):
+    return not (self == other)
+
+class FinalizeWriteIdResult:
+
+  thrift_spec = (
+  )
+
+  def read(self, iprot):
+    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and 
isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is 
not None and fastbinary is not None:
+      fastbinary.decode_binary(self, iprot.trans, (self.__class__, 
self.thrift_spec))
+      return
+    iprot.readStructBegin()
+    while True:
+      (fname, ftype, fid) = iprot.readFieldBegin()
+      if ftype == TType.STOP:
+        break
+      else:
+        iprot.skip(ftype)
+      iprot.readFieldEnd()
+    iprot.readStructEnd()
+
+  def write(self, oprot):
+    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and 
self.thrift_spec is not None and fastbinary is not None:
+      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, 
self.thrift_spec)))
+      return
+    oprot.writeStructBegin('FinalizeWriteIdResult')
+    oprot.writeFieldStop()
+    oprot.writeStructEnd()
+
+  def validate(self):
+    return
+
+
+  def __hash__(self):
+    value = 17
+    return value
+
+  def __repr__(self):
+    L = ['%s=%r' % (key, value)
+      for key, value in self.__dict__.iteritems()]
+    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+  def __eq__(self, other):
+    return isinstance(other, self.__class__) and self.__dict__ == 
other.__dict__
+
+  def __ne__(self, other):
+    return not (self == other)
+
+class HeartbeatWriteIdRequest:
+  """
+  Attributes:
+   - dbName
+   - tblName
+   - writeId
+  """
+
+  thrift_spec = (
+    None, # 0
+    (1, TType.STRING, 'dbName', None, None, ), # 1
+    (2, TType.STRING, 'tblName', None, None, ), # 2
+    (3, TType.I64, 'writeId', None, None, ), # 3
+  )
+
+  def __init__(self, dbName=None, tblName=None, writeId=None,):
+    self.dbName = dbName
+    self.tblName = tblName
+    self.writeId = writeId
+
+  def read(self, iprot):
+    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and 
isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is 
not None and fastbinary is not None:
+      fastbinary.decode_binary(self, iprot.trans, (self.__class__, 
self.thrift_spec))
+      return
+    iprot.readStructBegin()
+    while True:
+      (fname, ftype, fid) = iprot.readFieldBegin()
+      if ftype == TType.STOP:
+        break
+      if fid == 1:
+        if ftype == TType.STRING:
+          self.dbName = iprot.readString()
+        else:
+          iprot.skip(ftype)
+      elif fid == 2:
+        if ftype == TType.STRING:
+          self.tblName = iprot.readString()
+        else:
+          iprot.skip(ftype)
+      elif fid == 3:
+        if ftype == TType.I64:
+          self.writeId = iprot.readI64()
+        else:
+          iprot.skip(ftype)
+      else:
+        iprot.skip(ftype)
+      iprot.readFieldEnd()
+    iprot.readStructEnd()
+
+  def write(self, oprot):
+    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and 
self.thrift_spec is not None and fastbinary is not None:
+      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, 
self.thrift_spec)))
+      return
+    oprot.writeStructBegin('HeartbeatWriteIdRequest')
+    if self.dbName is not None:
+      oprot.writeFieldBegin('dbName', TType.STRING, 1)
+      oprot.writeString(self.dbName)
+      oprot.writeFieldEnd()
+    if self.tblName is not None:
+      oprot.writeFieldBegin('tblName', TType.STRING, 2)
+      oprot.writeString(self.tblName)
+      oprot.writeFieldEnd()
+    if self.writeId is not None:
+      oprot.writeFieldBegin('writeId', TType.I64, 3)
+      oprot.writeI64(self.writeId)
+      oprot.writeFieldEnd()
+    oprot.writeFieldStop()
+    oprot.writeStructEnd()
+
+  def validate(self):
+    if self.dbName is None:
+      raise TProtocol.TProtocolException(message='Required field dbName is 
unset!')
+    if self.tblName is None:
+      raise TProtocol.TProtocolException(message='Required field tblName is 
unset!')
+    if self.writeId is None:
+      raise TProtocol.TProtocolException(message='Required field writeId is 
unset!')
+    return
+
+
+  def __hash__(self):
+    value = 17
+    value = (value * 31) ^ hash(self.dbName)
+    value = (value * 31) ^ hash(self.tblName)
+    value = (value * 31) ^ hash(self.writeId)
+    return value
+
+  def __repr__(self):
+    L = ['%s=%r' % (key, value)
+      for key, value in self.__dict__.iteritems()]
+    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+  def __eq__(self, other):
+    return isinstance(other, self.__class__) and self.__dict__ == 
other.__dict__
+
+  def __ne__(self, other):
+    return not (self == other)
+
+class HeartbeatWriteIdResult:
+
+  thrift_spec = (
+  )
+
+  def read(self, iprot):
+    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and 
isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is 
not None and fastbinary is not None:
+      fastbinary.decode_binary(self, iprot.trans, (self.__class__, 
self.thrift_spec))
+      return
+    iprot.readStructBegin()
+    while True:
+      (fname, ftype, fid) = iprot.readFieldBegin()
+      if ftype == TType.STOP:
+        break
+      else:
+        iprot.skip(ftype)
+      iprot.readFieldEnd()
+    iprot.readStructEnd()
+
+  def write(self, oprot):
+    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and 
self.thrift_spec is not None and fastbinary is not None:
+      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, 
self.thrift_spec)))
+      return
+    oprot.writeStructBegin('HeartbeatWriteIdResult')
+    oprot.writeFieldStop()
+    oprot.writeStructEnd()
+
+  def validate(self):
+    return
+
+
+  def __hash__(self):
+    value = 17
+    return value
+
+  def __repr__(self):
+    L = ['%s=%r' % (key, value)
+      for key, value in self.__dict__.iteritems()]
+    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+  def __eq__(self, other):
+    return isinstance(other, self.__class__) and self.__dict__ == 
other.__dict__
+
+  def __ne__(self, other):
+    return not (self == other)
+
 class GetAllFunctionsResponse:
   """
   Attributes:

http://git-wip-us.apache.org/repos/asf/hive/blob/30fd19f4/metastore/src/gen/thrift/gen-rb/hive_metastore_constants.rb
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-rb/hive_metastore_constants.rb 
b/metastore/src/gen/thrift/gen-rb/hive_metastore_constants.rb
index 6aa7143..118a54e 100644
--- a/metastore/src/gen/thrift/gen-rb/hive_metastore_constants.rb
+++ b/metastore/src/gen/thrift/gen-rb/hive_metastore_constants.rb
@@ -55,3 +55,5 @@ TABLE_NO_AUTO_COMPACT = %q"no_auto_compaction"
 
 TABLE_TRANSACTIONAL_PROPERTIES = %q"transactional_properties"
 
+TABLE_IS_MM = %q"hivecommit"
+

http://git-wip-us.apache.org/repos/asf/hive/blob/30fd19f4/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb 
b/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
index bd94e98..95f2075 100644
--- a/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
+++ b/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
@@ -728,6 +728,8 @@ class Table
   TABLETYPE = 12
   PRIVILEGES = 13
   TEMPORARY = 14
+  MMNEXTWRITEID = 15
+  MMWATERMARKWRITEID = 16
 
   FIELDS = {
     TABLENAME => {:type => ::Thrift::Types::STRING, :name => 'tableName'},
@@ -743,7 +745,9 @@ class Table
     VIEWEXPANDEDTEXT => {:type => ::Thrift::Types::STRING, :name => 
'viewExpandedText'},
     TABLETYPE => {:type => ::Thrift::Types::STRING, :name => 'tableType'},
     PRIVILEGES => {:type => ::Thrift::Types::STRUCT, :name => 'privileges', 
:class => ::PrincipalPrivilegeSet, :optional => true},
-    TEMPORARY => {:type => ::Thrift::Types::BOOL, :name => 'temporary', 
:default => false, :optional => true}
+    TEMPORARY => {:type => ::Thrift::Types::BOOL, :name => 'temporary', 
:default => false, :optional => true},
+    MMNEXTWRITEID => {:type => ::Thrift::Types::I64, :name => 'mmNextWriteId', 
:optional => true},
+    MMWATERMARKWRITEID => {:type => ::Thrift::Types::I64, :name => 
'mmWatermarkWriteId', :optional => true}
   }
 
   def struct_fields; FIELDS; end
@@ -2756,6 +2760,122 @@ class CacheFileMetadataRequest
   ::Thrift::Struct.generate_accessors self
 end
 
+class GetNextWriteIdRequest
+  include ::Thrift::Struct, ::Thrift::Struct_Union
+  DBNAME = 1
+  TBLNAME = 2
+
+  FIELDS = {
+    DBNAME => {:type => ::Thrift::Types::STRING, :name => 'dbName'},
+    TBLNAME => {:type => ::Thrift::Types::STRING, :name => 'tblName'}
+  }
+
+  def struct_fields; FIELDS; end
+
+  def validate
+    raise 
::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required 
field dbName is unset!') unless @dbName
+    raise 
::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required 
field tblName is unset!') unless @tblName
+  end
+
+  ::Thrift::Struct.generate_accessors self
+end
+
+class GetNextWriteIdResult
+  include ::Thrift::Struct, ::Thrift::Struct_Union
+  WRITEID = 1
+
+  FIELDS = {
+    WRITEID => {:type => ::Thrift::Types::I64, :name => 'writeId'}
+  }
+
+  def struct_fields; FIELDS; end
+
+  def validate
+    raise 
::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required 
field writeId is unset!') unless @writeId
+  end
+
+  ::Thrift::Struct.generate_accessors self
+end
+
+class FinalizeWriteIdRequest
+  include ::Thrift::Struct, ::Thrift::Struct_Union
+  DBNAME = 1
+  TBLNAME = 2
+  WRITEID = 3
+  COMMIT = 4
+
+  FIELDS = {
+    DBNAME => {:type => ::Thrift::Types::STRING, :name => 'dbName'},
+    TBLNAME => {:type => ::Thrift::Types::STRING, :name => 'tblName'},
+    WRITEID => {:type => ::Thrift::Types::I64, :name => 'writeId'},
+    COMMIT => {:type => ::Thrift::Types::BOOL, :name => 'commit'}
+  }
+
+  def struct_fields; FIELDS; end
+
+  def validate
+    raise 
::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required 
field dbName is unset!') unless @dbName
+    raise 
::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required 
field tblName is unset!') unless @tblName
+    raise 
::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required 
field writeId is unset!') unless @writeId
+    raise 
::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required 
field commit is unset!') if @commit.nil?
+  end
+
+  ::Thrift::Struct.generate_accessors self
+end
+
+class FinalizeWriteIdResult
+  include ::Thrift::Struct, ::Thrift::Struct_Union
+
+  FIELDS = {
+
+  }
+
+  def struct_fields; FIELDS; end
+
+  def validate
+  end
+
+  ::Thrift::Struct.generate_accessors self
+end
+
+class HeartbeatWriteIdRequest
+  include ::Thrift::Struct, ::Thrift::Struct_Union
+  DBNAME = 1
+  TBLNAME = 2
+  WRITEID = 3
+
+  FIELDS = {
+    DBNAME => {:type => ::Thrift::Types::STRING, :name => 'dbName'},
+    TBLNAME => {:type => ::Thrift::Types::STRING, :name => 'tblName'},
+    WRITEID => {:type => ::Thrift::Types::I64, :name => 'writeId'}
+  }
+
+  def struct_fields; FIELDS; end
+
+  def validate
+    raise 
::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required 
field dbName is unset!') unless @dbName
+    raise 
::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required 
field tblName is unset!') unless @tblName
+    raise 
::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required 
field writeId is unset!') unless @writeId
+  end
+
+  ::Thrift::Struct.generate_accessors self
+end
+
+class HeartbeatWriteIdResult
+  include ::Thrift::Struct, ::Thrift::Struct_Union
+
+  FIELDS = {
+
+  }
+
+  def struct_fields; FIELDS; end
+
+  def validate
+  end
+
+  ::Thrift::Struct.generate_accessors self
+end
+
 class GetAllFunctionsResponse
   include ::Thrift::Struct, ::Thrift::Struct_Union
   FUNCTIONS = 1

http://git-wip-us.apache.org/repos/asf/hive/blob/30fd19f4/metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb 
b/metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb
index 51f65c6..403e07f 100644
--- a/metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb
+++ b/metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb
@@ -2484,6 +2484,51 @@ module ThriftHiveMetastore
       raise 
::Thrift::ApplicationException.new(::Thrift::ApplicationException::MISSING_RESULT,
 'cache_file_metadata failed: unknown result')
     end
 
+    def get_next_write_id(req)
+      send_get_next_write_id(req)
+      return recv_get_next_write_id()
+    end
+
+    def send_get_next_write_id(req)
+      send_message('get_next_write_id', Get_next_write_id_args, :req => req)
+    end
+
+    def recv_get_next_write_id()
+      result = receive_message(Get_next_write_id_result)
+      return result.success unless result.success.nil?
+      raise 
::Thrift::ApplicationException.new(::Thrift::ApplicationException::MISSING_RESULT,
 'get_next_write_id failed: unknown result')
+    end
+
+    def finalize_write_id(req)
+      send_finalize_write_id(req)
+      return recv_finalize_write_id()
+    end
+
+    def send_finalize_write_id(req)
+      send_message('finalize_write_id', Finalize_write_id_args, :req => req)
+    end
+
+    def recv_finalize_write_id()
+      result = receive_message(Finalize_write_id_result)
+      return result.success unless result.success.nil?
+      raise 
::Thrift::ApplicationException.new(::Thrift::ApplicationException::MISSING_RESULT,
 'finalize_write_id failed: unknown result')
+    end
+
+    def heartbeat_write_id(req)
+      send_heartbeat_write_id(req)
+      return recv_heartbeat_write_id()
+    end
+
+    def send_heartbeat_write_id(req)
+      send_message('heartbeat_write_id', Heartbeat_write_id_args, :req => req)
+    end
+
+    def recv_heartbeat_write_id()
+      result = receive_message(Heartbeat_write_id_result)
+      return result.success unless result.success.nil?
+      raise 
::Thrift::ApplicationException.new(::Thrift::ApplicationException::MISSING_RESULT,
 'heartbeat_write_id failed: unknown result')
+    end
+
   end
 
   class Processor < ::FacebookService::Processor 
@@ -4330,6 +4375,27 @@ module ThriftHiveMetastore
       write_result(result, oprot, 'cache_file_metadata', seqid)
     end
 
+    def process_get_next_write_id(seqid, iprot, oprot)
+      args = read_args(iprot, Get_next_write_id_args)
+      result = Get_next_write_id_result.new()
+      result.success = @handler.get_next_write_id(args.req)
+      write_result(result, oprot, 'get_next_write_id', seqid)
+    end
+
+    def process_finalize_write_id(seqid, iprot, oprot)
+      args = read_args(iprot, Finalize_write_id_args)
+      result = Finalize_write_id_result.new()
+      result.success = @handler.finalize_write_id(args.req)
+      write_result(result, oprot, 'finalize_write_id', seqid)
+    end
+
+    def process_heartbeat_write_id(seqid, iprot, oprot)
+      args = read_args(iprot, Heartbeat_write_id_args)
+      result = Heartbeat_write_id_result.new()
+      result.success = @handler.heartbeat_write_id(args.req)
+      write_result(result, oprot, 'heartbeat_write_id', seqid)
+    end
+
   end
 
   # HELPER FUNCTIONS AND STRUCTURES
@@ -9930,5 +9996,101 @@ module ThriftHiveMetastore
     ::Thrift::Struct.generate_accessors self
   end
 
+  class Get_next_write_id_args
+    include ::Thrift::Struct, ::Thrift::Struct_Union
+    REQ = 1
+
+    FIELDS = {
+      REQ => {:type => ::Thrift::Types::STRUCT, :name => 'req', :class => 
::GetNextWriteIdRequest}
+    }
+
+    def struct_fields; FIELDS; end
+
+    def validate
+    end
+
+    ::Thrift::Struct.generate_accessors self
+  end
+
+  class Get_next_write_id_result
+    include ::Thrift::Struct, ::Thrift::Struct_Union
+    SUCCESS = 0
+
+    FIELDS = {
+      SUCCESS => {:type => ::Thrift::Types::STRUCT, :name => 'success', :class 
=> ::GetNextWriteIdResult}
+    }
+
+    def struct_fields; FIELDS; end
+
+    def validate
+    end
+
+    ::Thrift::Struct.generate_accessors self
+  end
+
+  class Finalize_write_id_args
+    include ::Thrift::Struct, ::Thrift::Struct_Union
+    REQ = 1
+
+    FIELDS = {
+      REQ => {:type => ::Thrift::Types::STRUCT, :name => 'req', :class => 
::FinalizeWriteIdRequest}
+    }
+
+    def struct_fields; FIELDS; end
+
+    def validate
+    end
+
+    ::Thrift::Struct.generate_accessors self
+  end
+
+  class Finalize_write_id_result
+    include ::Thrift::Struct, ::Thrift::Struct_Union
+    SUCCESS = 0
+
+    FIELDS = {
+      SUCCESS => {:type => ::Thrift::Types::STRUCT, :name => 'success', :class 
=> ::FinalizeWriteIdResult}
+    }
+
+    def struct_fields; FIELDS; end
+
+    def validate
+    end
+
+    ::Thrift::Struct.generate_accessors self
+  end
+
+  class Heartbeat_write_id_args
+    include ::Thrift::Struct, ::Thrift::Struct_Union
+    REQ = 1
+
+    FIELDS = {
+      REQ => {:type => ::Thrift::Types::STRUCT, :name => 'req', :class => 
::HeartbeatWriteIdRequest}
+    }
+
+    def struct_fields; FIELDS; end
+
+    def validate
+    end
+
+    ::Thrift::Struct.generate_accessors self
+  end
+
+  class Heartbeat_write_id_result
+    include ::Thrift::Struct, ::Thrift::Struct_Union
+    SUCCESS = 0
+
+    FIELDS = {
+      SUCCESS => {:type => ::Thrift::Types::STRUCT, :name => 'success', :class 
=> ::HeartbeatWriteIdResult}
+    }
+
+    def struct_fields; FIELDS; end
+
+    def validate
+    end
+
+    ::Thrift::Struct.generate_accessors self
+  end
+
 end
 

http://git-wip-us.apache.org/repos/asf/hive/blob/30fd19f4/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index 3f85ca6..f99bcd2 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -48,7 +48,6 @@ import 
org.apache.hadoop.hive.common.metrics.common.MetricsVariable;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.api.*;
-import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.events.AddIndexEvent;
 import org.apache.hadoop.hive.metastore.events.AddPartitionEvent;
 import org.apache.hadoop.hive.metastore.events.AlterIndexEvent;
@@ -81,6 +80,7 @@ import 
org.apache.hadoop.hive.metastore.events.PreLoadPartitionDoneEvent;
 import org.apache.hadoop.hive.metastore.events.PreReadDatabaseEvent;
 import org.apache.hadoop.hive.metastore.events.PreReadTableEvent;
 import org.apache.hadoop.hive.metastore.filemeta.OrcFileMetadataHandler;
+import org.apache.hadoop.hive.metastore.model.MTableWrite;
 import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
@@ -181,11 +181,10 @@ public class HiveMetaStore extends ThriftHiveMetastore {
     };
   };
 
-  /**
-   * default port on which to start the Hive server
-   */
   public static final String ADMIN = "admin";
   public static final String PUBLIC = "public";
+  /** MM write states. */
+  public static final char MM_WRITE_OPEN = 'o', MM_WRITE_COMMITTED = 'c', 
MM_WRITE_ABORTED = 'a';
 
   private static HadoopThriftAuthBridge.Server saslServer;
   private static HiveDelegationTokenManager delegationTokenManager;
@@ -1253,13 +1252,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         }
       } catch (Exception e) {
         ex = e;
-        if (e instanceof MetaException) {
-          throw (MetaException) e;
-        } else if (e instanceof NoSuchObjectException) {
-          throw (NoSuchObjectException) e;
-        } else {
-          throw newMetaException(e);
-        }
+        throwMetaException(e);
       } finally {
         endFunction("get_type", ret != null, ex);
       }
@@ -1302,13 +1295,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         success = getMS().dropType(name);
       } catch (Exception e) {
         ex = e;
-        if (e instanceof MetaException) {
-          throw (MetaException) e;
-        } else if (e instanceof NoSuchObjectException) {
-          throw (NoSuchObjectException) e;
-        } else {
-          throw newMetaException(e);
-        }
+        throwMetaException(e);
       } finally {
         endFunction("drop_type", success, ex);
       }
@@ -1863,13 +1850,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         throw new MetaException(e.getMessage());
       } catch (Exception e) {
         ex = e;
-        if (e instanceof MetaException) {
-          throw (MetaException) e;
-        } else if (e instanceof NoSuchObjectException) {
-          throw (NoSuchObjectException) e;
-        } else {
-          throw newMetaException(e);
-        }
+        throwMetaException(e);
       } finally {
         endFunction("drop_table", success, ex, name);
       }
@@ -1941,7 +1922,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
      */
     public Table get_table_core(final String dbname, final String name) throws 
MetaException,
         NoSuchObjectException {
-      Table t;
+      Table t = null;
       try {
         t = getMS().getTable(dbname, name);
         if (t == null) {
@@ -1949,13 +1930,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
               + " table not found");
         }
       } catch (Exception e) {
-        if (e instanceof MetaException) {
-          throw (MetaException) e;
-        } else if (e instanceof NoSuchObjectException) {
-          throw (NoSuchObjectException) e;
-        } else {
-          throw newMetaException(e);
-        }
+        throwMetaException(e);
       }
       return t;
     }
@@ -3116,13 +3091,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         ret = getMS().getPartition(db_name, tbl_name, part_vals);
       } catch (Exception e) {
         ex = e;
-        if (e instanceof MetaException) {
-          throw (MetaException) e;
-        } else if (e instanceof NoSuchObjectException) {
-          throw (NoSuchObjectException) e;
-        } else {
-          throw newMetaException(e);
-        }
+        throwMetaException(e);
       } finally {
         endFunction("get_partition", ret != null, ex, tbl_name);
       }
@@ -3188,13 +3157,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         ret = getMS().getPartitions(db_name, tbl_name, max_parts);
       } catch (Exception e) {
         ex = e;
-        if (e instanceof MetaException) {
-          throw (MetaException) e;
-        } else if (e instanceof NoSuchObjectException) {
-          throw (NoSuchObjectException) e;
-        } else {
-          throw newMetaException(e);
-        }
+        throwMetaException(e);
       } finally {
         endFunction("get_partitions", ret != null, ex, tbl_name);
       }
@@ -6443,13 +6406,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         ret = getMS().getPrimaryKeys(db_name, tbl_name);
       } catch (Exception e) {
        ex = e;
-       if (e instanceof MetaException) {
-         throw (MetaException) e;
-       } else if (e instanceof NoSuchObjectException) {
-         throw (NoSuchObjectException) e;
-       } else {
-         throw newMetaException(e);
-       }
+       throwMetaException(e);
      } finally {
        endFunction("get_primary_keys", ret != null, ex, tbl_name);
      }
@@ -6473,18 +6430,142 @@ public class HiveMetaStore extends ThriftHiveMetastore 
{
               foreign_db_name, foreign_tbl_name);
       } catch (Exception e) {
         ex = e;
-        if (e instanceof MetaException) {
-          throw (MetaException) e;
-        } else if (e instanceof NoSuchObjectException) {
-          throw (NoSuchObjectException) e;
-        } else {
-          throw newMetaException(e);
-        }
+        throwMetaException(e);
       } finally {
         endFunction("get_foreign_keys", ret != null, ex, foreign_tbl_name);
       }
       return new ForeignKeysResponse(ret);
     }
+
+    private void throwMetaException(Exception e) throws MetaException,
+        NoSuchObjectException {
+      if (e instanceof MetaException) {
+        throw (MetaException) e;
+      } else if (e instanceof NoSuchObjectException) {
+        throw (NoSuchObjectException) e;
+      } else {
+        throw newMetaException(e);
+      }
+    }
+
+    @Override
+    public GetNextWriteIdResult get_next_write_id(GetNextWriteIdRequest req) 
throws TException {
+      RawStore ms = getMS();
+      String dbName = req.getDbName(), tblName = req.getTblName();
+      startFunction("get_next_write_id", " : db=" + dbName + " tbl=" + 
tblName);
+      Exception ex = null;
+      long writeId = -1;
+      // TODO# see TXN about how to handle conflicts
+      try {
+        boolean ok = false;
+        ms.openTransaction();
+        try {
+          Table tbl = ms.getTable(dbName, tblName);
+          if (tbl == null) {
+            throw new NoSuchObjectException(dbName + "." + tblName);
+          }
+          writeId = tbl.isSetMmNextWriteId() ? tbl.getMmNextWriteId() : 0;
+          tbl.setMmNextWriteId(writeId + 1);
+          ms.alterTable(dbName, tblName, tbl);
+          ok = true;
+        } finally {
+          commitOrRollback(ms, ok);
+        }
+        // Do a separate txn after we have reserved the number. TODO: If we 
fail, ignore on read.
+        ok = false;
+        ms.openTransaction();
+        try {
+          Table tbl = ms.getTable(dbName, tblName);
+          ms.createTableWrite(tbl, writeId, MM_WRITE_OPEN, 
System.currentTimeMillis());
+          ok = true;
+        } finally {
+          commitOrRollback(ms, ok);
+        }
+      } catch (Exception e) {
+        ex = e;
+        throwMetaException(e);
+      } finally {
+        endFunction("get_next_write_id", ex == null, ex, tblName);
+      }
+      return new GetNextWriteIdResult(writeId);
+    }
+
+    @Override
+    public FinalizeWriteIdResult finalize_write_id(FinalizeWriteIdRequest req) 
throws TException {
+      RawStore ms = getMS();
+      String dbName = req.getDbName(), tblName = req.getTblName();
+      long writeId = req.getWriteId();
+      boolean commit = req.isCommit();
+      startFunction("finalize_write_id", " : db=" + dbName + " tbl=" + tblName
+          + " writeId=" + writeId + " commit=" + commit);
+      Exception ex = null;
+      try {
+        boolean ok = false;
+        ms.openTransaction();
+        try {
+          MTableWrite tw = getActiveTableWrite(ms, dbName, tblName, writeId);
+          tw.setState(String.valueOf(commit ? MM_WRITE_COMMITTED : 
MM_WRITE_ABORTED));
+          ms.updateTableWrite(tw);
+          ok = true;
+        } finally {
+          commitOrRollback(ms, ok);
+        }
+      } catch (Exception e) {
+        ex = e;
+        throwMetaException(e);
+      } finally {
+        endFunction("finalize_write_id", ex == null, ex, tblName);
+      }
+      return new FinalizeWriteIdResult();
+    }
+
+    private void commitOrRollback(RawStore ms, boolean ok) throws 
MetaException {
+      if (ok) {
+        if (!ms.commitTransaction()) throw new MetaException("Failed to 
commit");
+      } else {
+        ms.rollbackTransaction();
+      }
+    }
+
+    @Override
+    public HeartbeatWriteIdResult heartbeat_write_id(HeartbeatWriteIdRequest 
req)
+        throws TException {
+      RawStore ms = getMS();
+      String dbName = req.getDbName(), tblName = req.getTblName();
+      long writeId = req.getWriteId();
+      startFunction("heartbeat_write_id", " : db="
+          + dbName + " tbl=" + tblName + " writeId=" + writeId);
+      Exception ex = null;
+      try {
+        boolean ok = false;
+        ms.openTransaction();
+        try {
+          MTableWrite tw = getActiveTableWrite(ms, dbName, tblName, writeId);
+          tw.setLastHeartbeat(System.currentTimeMillis());
+          ms.updateTableWrite(tw);
+          ok = true;
+        } finally {
+          commitOrRollback(ms, ok);
+        }
+      } catch (Exception e) {
+        ex = e;
+        throwMetaException(e);
+      } finally {
+        endFunction("heartbeat_write_id", ex == null, ex, tblName);
+      }
+      return new HeartbeatWriteIdResult();
+    }
+
+    private MTableWrite getActiveTableWrite(RawStore ms, String dbName,
+        String tblName, long writeId) throws MetaException {
+      MTableWrite tw = ms.getTableWrite(dbName, tblName, writeId);
+      assert tw.getState().length() == 1;
+      char state = tw.getState().charAt(0);
+      if (state != MM_WRITE_OPEN) {
+        throw new MetaException("Invalid write state to finalize: " + state);
+      }
+      return tw;
+    }
   }
 
 

http://git-wip-us.apache.org/repos/asf/hive/blob/30fd19f4/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
index 909d8eb..6bd6d92 100644
--- 
a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
+++ 
b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
@@ -27,104 +27,7 @@ import 
org.apache.hadoop.hive.common.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.conf.HiveConfUtil;
-import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
-import org.apache.hadoop.hive.metastore.api.AbortTxnsRequest;
-import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions;
-import org.apache.hadoop.hive.metastore.api.AddForeignKeyRequest;
-import org.apache.hadoop.hive.metastore.api.AddPartitionsRequest;
-import org.apache.hadoop.hive.metastore.api.AddPartitionsResult;
-import org.apache.hadoop.hive.metastore.api.AddPrimaryKeyRequest;
-import org.apache.hadoop.hive.metastore.api.AggrStats;
-import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
-import org.apache.hadoop.hive.metastore.api.CacheFileMetadataRequest;
-import org.apache.hadoop.hive.metastore.api.CacheFileMetadataResult;
-import org.apache.hadoop.hive.metastore.api.CheckLockRequest;
-import org.apache.hadoop.hive.metastore.api.ClearFileMetadataRequest;
-import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
-import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
-import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
-import org.apache.hadoop.hive.metastore.api.CompactionRequest;
-import org.apache.hadoop.hive.metastore.api.CompactionType;
-import org.apache.hadoop.hive.metastore.api.ConfigValSecurityException;
-import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
-import org.apache.hadoop.hive.metastore.api.DataOperationType;
-import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.DropConstraintRequest;
-import org.apache.hadoop.hive.metastore.api.DropPartitionsExpr;
-import org.apache.hadoop.hive.metastore.api.DropPartitionsRequest;
-import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.FireEventRequest;
-import org.apache.hadoop.hive.metastore.api.FireEventResponse;
-import org.apache.hadoop.hive.metastore.api.ForeignKeysRequest;
-import org.apache.hadoop.hive.metastore.api.Function;
-import org.apache.hadoop.hive.metastore.api.GetAllFunctionsResponse;
-import org.apache.hadoop.hive.metastore.api.GetFileMetadataByExprRequest;
-import org.apache.hadoop.hive.metastore.api.GetFileMetadataByExprResult;
-import org.apache.hadoop.hive.metastore.api.GetFileMetadataRequest;
-import org.apache.hadoop.hive.metastore.api.GetFileMetadataResult;
-import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
-import org.apache.hadoop.hive.metastore.api.GetPrincipalsInRoleRequest;
-import org.apache.hadoop.hive.metastore.api.GetPrincipalsInRoleResponse;
-import org.apache.hadoop.hive.metastore.api.GetRoleGrantsForPrincipalRequest;
-import org.apache.hadoop.hive.metastore.api.GetRoleGrantsForPrincipalResponse;
-import org.apache.hadoop.hive.metastore.api.GrantRevokePrivilegeRequest;
-import org.apache.hadoop.hive.metastore.api.GrantRevokePrivilegeResponse;
-import org.apache.hadoop.hive.metastore.api.GrantRevokeRoleRequest;
-import org.apache.hadoop.hive.metastore.api.GrantRevokeRoleResponse;
-import org.apache.hadoop.hive.metastore.api.GrantRevokeType;
-import org.apache.hadoop.hive.metastore.api.HeartbeatRequest;
-import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeRequest;
-import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse;
-import org.apache.hadoop.hive.metastore.api.HiveObjectPrivilege;
-import org.apache.hadoop.hive.metastore.api.HiveObjectRef;
-import org.apache.hadoop.hive.metastore.api.Index;
-import org.apache.hadoop.hive.metastore.api.InvalidInputException;
-import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
-import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
-import org.apache.hadoop.hive.metastore.api.InvalidPartitionException;
-import org.apache.hadoop.hive.metastore.api.LockRequest;
-import org.apache.hadoop.hive.metastore.api.LockResponse;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.MetadataPpdResult;
-import org.apache.hadoop.hive.metastore.api.NoSuchLockException;
-import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
-import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
-import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.apache.hadoop.hive.metastore.api.NotificationEventRequest;
-import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
-import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
-import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.PartitionEventType;
-import org.apache.hadoop.hive.metastore.api.PartitionsByExprRequest;
-import org.apache.hadoop.hive.metastore.api.PartitionsByExprResult;
-import org.apache.hadoop.hive.metastore.api.PartitionsStatsRequest;
-import org.apache.hadoop.hive.metastore.api.PrimaryKeysRequest;
-import org.apache.hadoop.hive.metastore.api.PrincipalPrivilegeSet;
-import org.apache.hadoop.hive.metastore.api.PrincipalType;
-import org.apache.hadoop.hive.metastore.api.PrivilegeBag;
-import org.apache.hadoop.hive.metastore.api.PutFileMetadataRequest;
-import org.apache.hadoop.hive.metastore.api.RequestPartsSpec;
-import org.apache.hadoop.hive.metastore.api.Role;
-import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
-import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
-import org.apache.hadoop.hive.metastore.api.SetPartitionsStatsRequest;
-import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
-import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
-import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
-import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.TableMeta;
-import org.apache.hadoop.hive.metastore.api.TableStatsRequest;
-import org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore;
-import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
-import org.apache.hadoop.hive.metastore.api.TxnOpenException;
-import org.apache.hadoop.hive.metastore.api.Type;
-import org.apache.hadoop.hive.metastore.api.UnknownDBException;
-import org.apache.hadoop.hive.metastore.api.UnknownPartitionException;
-import org.apache.hadoop.hive.metastore.api.UnknownTableException;
-import org.apache.hadoop.hive.metastore.api.UnlockRequest;
+import org.apache.hadoop.hive.metastore.api.*;
 import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.shims.ShimLoader;
@@ -2484,4 +2387,21 @@ public class HiveMetaStoreClient implements 
IMetaStoreClient {
     CacheFileMetadataResult result = client.cache_file_metadata(req);
     return result.isIsSupported();
   }
+
+  @Override
+  public long getNextTableWriteId(String dbName, String tableName) throws 
TException {
+    return client.get_next_write_id(new GetNextWriteIdRequest(dbName, 
tableName)).getWriteId();
+  }
+
+  @Override
+  public void finalizeTableWrite(
+      String dbName, String tableName, long writeId, boolean commit) throws 
TException {
+    client.finalize_write_id(new FinalizeWriteIdRequest(dbName, tableName, 
writeId, commit));
+  }
+
+  @Override
+  public void heartbeatTableWrite(
+      String dbName, String tableName, long writeId) throws TException {
+    client.heartbeat_write_id(new HeartbeatWriteIdRequest(dbName, tableName, 
writeId));
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/30fd19f4/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
index 8dc4e28..f5d611d 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
@@ -1619,4 +1619,11 @@ public interface IMetaStoreClient {
 
   void addForeignKey(List<SQLForeignKey> foreignKeyCols) throws
   MetaException, NoSuchObjectException, TException;
+
+  long getNextTableWriteId(String dbName, String tableName) throws TException;
+
+  void heartbeatTableWrite(String dbName, String tableName, long writeId) 
throws TException;
+
+  void finalizeTableWrite(String dbName, String tableName, long writeId,
+      boolean commit) throws TException;
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/30fd19f4/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index 83a3e39..9dc80b1 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -142,6 +142,7 @@ import org.apache.hadoop.hive.metastore.model.MTable;
 import org.apache.hadoop.hive.metastore.model.MTableColumnPrivilege;
 import org.apache.hadoop.hive.metastore.model.MTableColumnStatistics;
 import org.apache.hadoop.hive.metastore.model.MTablePrivilege;
+import org.apache.hadoop.hive.metastore.model.MTableWrite;
 import org.apache.hadoop.hive.metastore.model.MType;
 import org.apache.hadoop.hive.metastore.model.MVersionTable;
 import org.apache.hadoop.hive.metastore.parser.ExpressionTree;
@@ -1053,6 +1054,11 @@ public class ObjectStore implements RawStore, 
Configurable {
           pm.deletePersistentAll(tabConstraints);
         }
 
+        List<MTableWrite> tableWrites = listAllTableWrites(dbName, tableName);
+        if (tableWrites != null && tableWrites.size() > 0) {
+          pm.deletePersistentAll(tableWrites);
+        }
+
         preDropStorageDescriptor(tbl.getSd());
         // then remove the table
         pm.deletePersistentAll(tbl);
@@ -1108,7 +1114,33 @@ public class ObjectStore implements RawStore, 
Configurable {
     return mConstraints;
   }
 
-@Override
+
+  private List<MTableWrite> listAllTableWrites(String dbName, String 
tableName) {
+    List<MTableWrite> result = null;
+    Query query = null;
+    boolean success = false;
+    openTransaction();
+    try {
+      String queryStr = "table.tableName == t1 && table.database.name == t2";
+      query = pm.newQuery(MTableWrite.class, queryStr);
+      query.declareParameters("java.lang.String t1, java.lang.String t2");
+      result = new ArrayList<>((List<MTableWrite>) 
query.executeWithArray(tableName, dbName));
+      pm.retrieveAll(result);
+      success = true;
+    } finally {
+      if (success) {
+        commitTransaction();
+      } else {
+        rollbackTransaction();
+      }
+      if (query != null) {
+        query.closeAll();
+      }
+    }
+    return result;
+  }
+
+  @Override
   public Table getTable(String dbName, String tableName) throws MetaException {
     boolean commited = false;
     Table tbl = null;
@@ -1410,11 +1442,14 @@ public class ObjectStore implements RawStore, 
Configurable {
         tableType = TableType.MANAGED_TABLE.toString();
       }
     }
-    return new Table(mtbl.getTableName(), mtbl.getDatabase().getName(), mtbl
+    Table t = new Table(mtbl.getTableName(), mtbl.getDatabase().getName(), mtbl
         .getOwner(), mtbl.getCreateTime(), mtbl.getLastAccessTime(), mtbl
         .getRetention(), convertToStorageDescriptor(mtbl.getSd()),
         convertToFieldSchemas(mtbl.getPartitionKeys()), 
convertMap(mtbl.getParameters()),
         mtbl.getViewOriginalText(), mtbl.getViewExpandedText(), tableType);
+    t.setMmNextWriteId(mtbl.getMmNextWriteId());
+    t.setMmWatermarkWriteId(mtbl.getMmWatermarkWriteId());
+    return t;
   }
 
   private MTable convertToMTable(Table tbl) throws InvalidObjectException,
@@ -1452,7 +1487,8 @@ public class ObjectStore implements RawStore, 
Configurable {
         .getCreateTime(), tbl.getLastAccessTime(), tbl.getRetention(),
         convertToMFieldSchemas(tbl.getPartitionKeys()), tbl.getParameters(),
         tbl.getViewOriginalText(), tbl.getViewExpandedText(),
-        tableType);
+        tableType, tbl.isSetMmNextWriteId() ?  tbl.getMmNextWriteId() : -1,
+            tbl.isSetMmWatermarkWriteId() ?  tbl.getMmWatermarkWriteId() : -1);
   }
 
   private List<MFieldSchema> convertToMFieldSchemas(List<FieldSchema> keys) {
@@ -3218,6 +3254,8 @@ public class ObjectStore implements RawStore, 
Configurable {
       oldt.setLastAccessTime(newt.getLastAccessTime());
       oldt.setViewOriginalText(newt.getViewOriginalText());
       oldt.setViewExpandedText(newt.getViewExpandedText());
+      oldt.setMmNextWriteId(newt.getMmNextWriteId());
+      oldt.setMmWatermarkWriteId(newt.getMmWatermarkWriteId());
 
       // commit the changes
       success = commitTransaction();
@@ -8613,4 +8651,76 @@ public class ObjectStore implements RawStore, 
Configurable {
     }
   }
 
+  @Override
+  public void createTableWrite(Table tbl, long writeId, char state, long 
heartbeat) {
+    boolean success = false;
+    openTransaction();
+    try {
+      MTable mtbl = getMTable(tbl.getDbName(), tbl.getTableName());
+      MTableWrite tw = new MTableWrite(mtbl, writeId, String.valueOf(state), 
heartbeat);
+      pm.makePersistent(tw);
+      success = true;
+    } finally {
+      if (success) {
+        commitTransaction();
+      } else {
+        rollbackTransaction();
+      }
+    }
+  }
+
+  @Override
+  public void updateTableWrite(MTableWrite tw) {
+    boolean success = false;
+    openTransaction();
+    try {
+      pm.makePersistent(tw);
+      success = true;
+    } finally {
+      if (success) {
+        commitTransaction();
+      } else {
+        rollbackTransaction();
+      }
+    }
+  }
+
+  @Override
+  public MTableWrite getTableWrite(
+      String dbName, String tblName, long writeId) throws MetaException {
+    boolean success = false;
+    Query query = null;
+    try {
+      openTransaction();
+      dbName = HiveStringUtils.normalizeIdentifier(dbName);
+      tblName = HiveStringUtils.normalizeIdentifier(tblName);
+      MTable mtbl = getMTable(dbName, tblName);
+      if (mtbl == null) {
+        success = true;
+        return null;
+      }
+      query = pm.newQuery(MTableWrite.class,
+              "table.tableName == t1 && table.database.name == t2 && writeId 
== t3");
+      query.declareParameters("java.lang.String t1, java.lang.String t2, 
java.lang.Long t3");
+      List<MTableWrite> writes = (List<MTableWrite>) query.execute(tblName, 
dbName, writeId);
+      pm.retrieveAll(writes);
+      success = true;
+      if (writes == null || writes.isEmpty()) return null;
+      if (writes.size() > 1) {
+        throw new MetaException(
+            "More than one TableWrite for " + dbName + "." + tblName + " and " 
+ writeId);
+      }
+      return writes.get(0);
+    } finally {
+      if (success) {
+        commitTransaction();
+      } else {
+        rollbackTransaction();
+      }
+      if (query != null) {
+        query.closeAll();
+      }
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/30fd19f4/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java
index bbd47b8..c5359cf 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.hive.metastore.api.Type;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
 import org.apache.hadoop.hive.metastore.api.UnknownPartitionException;
 import org.apache.hadoop.hive.metastore.api.UnknownTableException;
+import org.apache.hadoop.hive.metastore.model.MTableWrite;
 import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
 import org.apache.thrift.TException;
 
@@ -680,4 +681,10 @@ public interface RawStore extends Configurable {
   void addPrimaryKeys(List<SQLPrimaryKey> pks) throws InvalidObjectException, 
MetaException;
 
   void addForeignKeys(List<SQLForeignKey> fks) throws InvalidObjectException, 
MetaException;
+
+  void updateTableWrite(MTableWrite tw);
+
+  MTableWrite getTableWrite(String dbName, String tblName, long writeId) 
throws MetaException;
+
+  void createTableWrite(Table tbl, long writeId, char state, long heartbeat);
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/30fd19f4/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
index c65c7a4..4fbeb9e 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
@@ -72,6 +72,7 @@ import 
org.apache.hadoop.hive.metastore.api.UnknownPartitionException;
 import org.apache.hadoop.hive.metastore.api.UnknownTableException;
 import org.apache.hadoop.hive.metastore.hbase.HBaseFilterPlanUtil.PlanResult;
 import org.apache.hadoop.hive.metastore.hbase.HBaseFilterPlanUtil.ScanPlan;
+import org.apache.hadoop.hive.metastore.model.MTableWrite;
 import org.apache.hadoop.hive.metastore.parser.ExpressionTree;
 import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
@@ -2722,4 +2723,22 @@ public class HBaseStore implements RawStore {
     throws InvalidObjectException, MetaException {
     // TODO: WTF?
   }
+
+  @Override
+  public void createTableWrite(Table tbl, long writeId, char state, long 
heartbeat) {
+    // TODO: Auto-generated method stub
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void updateTableWrite(MTableWrite tw) {
+    // TODO: Auto-generated method stub
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public MTableWrite getTableWrite(String dbName, String tblName, long 
writeId) {
+    // TODO: Auto-generated method stub
+    throw new UnsupportedOperationException();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/30fd19f4/metastore/src/model/org/apache/hadoop/hive/metastore/model/MTable.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/model/org/apache/hadoop/hive/metastore/model/MTable.java 
b/metastore/src/model/org/apache/hadoop/hive/metastore/model/MTable.java
index 2a78ce9..51c62e3 100644
--- a/metastore/src/model/org/apache/hadoop/hive/metastore/model/MTable.java
+++ b/metastore/src/model/org/apache/hadoop/hive/metastore/model/MTable.java
@@ -35,6 +35,8 @@ public class MTable {
   private String viewOriginalText;
   private String viewExpandedText;
   private String tableType;
+  private long mmNextWriteId;
+  private long mmWatermarkWriteId;
 
   public MTable() {}
 
@@ -55,7 +57,8 @@ public class MTable {
   public MTable(String tableName, MDatabase database, MStorageDescriptor sd, 
String owner,
       int createTime, int lastAccessTime, int retention, List<MFieldSchema> 
partitionKeys,
       Map<String, String> parameters,
-      String viewOriginalText, String viewExpandedText, String tableType) {
+      String viewOriginalText, String viewExpandedText, String tableType, long 
mmNextWriteId,
+      long mmWatermarkWriteId) {
     this.tableName = tableName;
     this.database = database;
     this.sd = sd;
@@ -68,6 +71,8 @@ public class MTable {
     this.viewOriginalText = viewOriginalText;
     this.viewExpandedText = viewExpandedText;
     this.tableType = tableType;
+    this.mmWatermarkWriteId = mmWatermarkWriteId;
+    this.mmNextWriteId = mmNextWriteId;
   }
 
   /**
@@ -237,4 +242,20 @@ public class MTable {
   public String getTableType() {
     return tableType;
   }
+
+  public long getMmNextWriteId() {
+    return mmNextWriteId;
+  }
+
+  public long getMmWatermarkWriteId() {
+    return mmWatermarkWriteId;
+  }
+
+  public void setMmNextWriteId(long mmNextWriteId) {
+    this.mmNextWriteId = mmNextWriteId;
+  }
+
+  public void setMmWatermarkWriteId(long mmWatermarkWriteId) {
+    this.mmWatermarkWriteId = mmWatermarkWriteId;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/30fd19f4/metastore/src/model/org/apache/hadoop/hive/metastore/model/MTableWrite.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/model/org/apache/hadoop/hive/metastore/model/MTableWrite.java 
b/metastore/src/model/org/apache/hadoop/hive/metastore/model/MTableWrite.java
new file mode 100644
index 0000000..a7e5f3e
--- /dev/null
+++ 
b/metastore/src/model/org/apache/hadoop/hive/metastore/model/MTableWrite.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.model;
+
+public class MTableWrite {
+  private MTable table;
+  private long writeId;
+  private String state;
+  private long lastHeartbeat;
+
+  public MTableWrite() {}
+
+  public MTableWrite(MTable table, long writeId, String state, long 
lastHeartbeat) {
+    this.table = table;
+    this.writeId = writeId;
+    this.state = state;
+    this.lastHeartbeat = lastHeartbeat;
+  }
+
+  public MTable getTable() {
+    return table;
+  }
+
+  public long getWriteId() {
+    return writeId;
+  }
+
+  public String getState() {
+    return state;
+  }
+
+  public long getLastHeartbeat() {
+    return lastHeartbeat;
+  }
+
+  public void setTable(MTable table) {
+    this.table = table;
+  }
+
+  public void setWriteId(long writeId) {
+    this.writeId = writeId;
+  }
+
+  public void setState(String state) {
+    this.state = state;
+  }
+
+  public void setLastHeartbeat(long lastHeartbeat) {
+    this.lastHeartbeat = lastHeartbeat;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/30fd19f4/metastore/src/model/package.jdo
----------------------------------------------------------------------
diff --git a/metastore/src/model/package.jdo b/metastore/src/model/package.jdo
index bfd6ddd..5126556 100644
--- a/metastore/src/model/package.jdo
+++ b/metastore/src/model/package.jdo
@@ -182,6 +182,12 @@
       <field name="tableType">
         <column name="TBL_TYPE" length="128" jdbc-type="VARCHAR"/>
       </field>
+      <field name="mmNextWriteId">
+        <column name="MM_NEXT_WRITE_ID" jdbc-type="BIGINT"/>
+      </field>
+      <field name="mmWatermarkWriteId">
+        <column name="MM_WATERMARK_WRITE_ID" jdbc-type="BIGINT"/>
+      </field>
     </class>
 
     <class name="MConstraint" identity-type="application" 
table="KEY_CONSTRAINTS" detachable="true" objectid-class="MConstraint$PK">
@@ -1058,6 +1064,29 @@
       </field>
     </class>
 
+    <!-- using datastore identity here, cause composite application PKs are a 
PITA -->
+    <class name="MTableWrite" table="TBL_WRITES" identity-type="datastore" 
detachable="true">
+      <datastore-identity>
+        <column name="TW_ID"/>
+      </datastore-identity>
+      <index name="UniqueWrite" unique="true">
+        <column name="TBL_ID"/>
+        <column name="WRITE_ID"/>
+      </index>
+      <field name="writeId">
+        <column name="WRITE_ID" jdbc-type="BIGINT" allows-null="false"/>
+      </field>
+      <field name="table">
+        <column name="TBL_ID"/>
+      </field>
+      <field name="state">
+        <column name="STATE" length="1" jdbc-type="CHAR" allows-null="false"/>
+      </field>
+      <field name="lastHeartbeat">
+        <column name="LAST_HEARTBEAT" jdbc-type="BIGINT" allows-null="false"/>
+      </field>
+    </class>
+
 
   </package>
 </jdo>

http://git-wip-us.apache.org/repos/asf/hive/blob/30fd19f4/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
 
b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
index 1ea72a0..9fffd3f 100644
--- 
a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
+++ 
b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.hive.metastore.api.Type;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
 import org.apache.hadoop.hive.metastore.api.UnknownPartitionException;
 import org.apache.hadoop.hive.metastore.api.UnknownTableException;
+import org.apache.hadoop.hive.metastore.model.MTableWrite;
 import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
 import org.apache.thrift.TException;
 
@@ -863,4 +864,18 @@ public class DummyRawStoreControlledCommit implements 
RawStore, Configurable {
     throws InvalidObjectException, MetaException {
     // TODO Auto-generated method stub
   }
+
+  @Override
+  public void createTableWrite(Table tbl, long writeId, char state, long 
heartbeat) {
+  }
+
+  @Override
+  public void updateTableWrite(MTableWrite tw) {
+
+  }
+
+  @Override
+  public MTableWrite getTableWrite(String dbName, String tblName, long 
writeId) {
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/30fd19f4/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
 
b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
index 3e6acc7..a763085 100644
--- 
a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
+++ 
b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.hive.metastore.api.Type;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
 import org.apache.hadoop.hive.metastore.api.UnknownPartitionException;
 import org.apache.hadoop.hive.metastore.api.UnknownTableException;
+import org.apache.hadoop.hive.metastore.model.MTableWrite;
 import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
 import org.apache.thrift.TException;
 
@@ -879,6 +880,19 @@ public class DummyRawStoreForJdoConnection implements 
RawStore {
     throws InvalidObjectException, MetaException {
     // TODO Auto-generated method stub
   }
+
+  @Override
+  public void createTableWrite(Table tbl, long writeId, char state, long 
heartbeat) {
+  }
+
+  @Override
+  public void updateTableWrite(MTableWrite tw) {
+  }
+
+  @Override
+  public MTableWrite getTableWrite(String dbName, String tblName, long 
writeId) {
+    return null;
+  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/hive/blob/30fd19f4/ql/src/java/org/apache/hadoop/hive/ql/Context.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java 
b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
index 1013f7c..e2777c8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
@@ -233,8 +233,8 @@ public class Context {
       // Append task specific info to stagingPathName, instead of creating a 
sub-directory.
       // This way we don't have to worry about deleting the stagingPathName 
separately at
       // end of query execution.
-      // TODO# HERE
-      dir = fs.makeQualified(new Path(stagingPathName + "_" + 
getExecutionPrefix()));
+      dir = fs.makeQualified(new Path(
+          stagingPathName + "_" + this.executionId + "-" + 
TaskRunner.getTaskRunnerID()));
 
       LOG.debug("Created staging dir = " + dir + " for path = " + inputPath);
 
@@ -820,10 +820,6 @@ public class Context {
     this.skipTableMasking = skipTableMasking;
   }
 
-  public String getExecutionPrefix() {
-    return this.executionId + "-" + TaskRunner.getTaskRunnerID();
-  }
-
   public ExplainConfiguration getExplainConfig() {
     return explainConfig;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/30fd19f4/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index b8a2c5a..6a0143a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -239,7 +239,7 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
         }
       }
       if (isMmTable) {
-        Path manifestPath = new Path(specPath, "_tmp." + getPrefixedTaskId() + 
MANIFEST_EXTENSION);
+        Path manifestPath = new Path(specPath, "_tmp." + getMmPrefixedTaskId() 
+ MANIFEST_EXTENSION);
         Utilities.LOG14535.info("Writing manifest to " + manifestPath + " with 
" + commitPaths);
         try {
           try (FSDataOutputStream out = fs.create(manifestPath)) {
@@ -254,10 +254,6 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
       }
     }
 
-    private String getPrefixedTaskId() {
-      return conf.getExecutionPrefix() + "_" + taskId;
-    }
-
     private void commitOneOutPath(int idx, FileSystem fs, List<Path> 
commitPaths)
         throws IOException, HiveException {
       if ((bDynParts || isSkewedStoredAsSubDirectories)
@@ -328,10 +324,10 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
           outPaths[filesIdx] = getTaskOutPath(taskId);
         } else {
           if (!bDynParts && !isSkewedStoredAsSubDirectories) {
-            finalPaths[filesIdx] = getFinalPath(getPrefixedTaskId(), specPath, 
extension);
+            finalPaths[filesIdx] = getFinalPath(getMmPrefixedTaskId(), 
specPath, extension);
           } else {
             // TODO# wrong!
-            finalPaths[filesIdx] = getFinalPath(getPrefixedTaskId(), specPath, 
extension);
+            finalPaths[filesIdx] = getFinalPath(getMmPrefixedTaskId(), 
specPath, extension);
           }
           outPaths[filesIdx] = finalPaths[filesIdx];
         }
@@ -725,6 +721,10 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
     }
   }
 
+  private String getMmPrefixedTaskId() {
+    return AcidUtils.getMmFilePrefix(conf.getMmWriteId()) + taskId;
+  }
+
   protected Writable recordValue;
 
 
@@ -1216,16 +1216,16 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
     FileSystem fs = specPath.getFileSystem(hconf);
     int targetLevel = (dpCtx == null) ? 1 : dpCtx.getNumDPCols();
     if (!success) {
-      FileStatus[] statuses = HiveStatsUtils.getFileStatusRecurse(
-          specPath, targetLevel, fs, new 
ExecPrefixPathFilter(conf.getExecutionPrefix()));
+      FileStatus[] statuses = HiveStatsUtils.getFileStatusRecurse(specPath, 
targetLevel, fs,
+          new 
ExecPrefixPathFilter(AcidUtils.getMmFilePrefix(conf.getMmWriteId())));
       for (FileStatus status : statuses) {
         Utilities.LOG14535.info("Deleting " + status.getPath() + " on 
failure");
         tryDelete(fs, status.getPath());
       }
       return;
     }
-    FileStatus[] statuses = HiveStatsUtils.getFileStatusRecurse(
-        specPath, targetLevel, fs, new 
ExecPrefixPathFilter(conf.getExecutionPrefix()));
+    FileStatus[] statuses = HiveStatsUtils.getFileStatusRecurse(specPath, 
targetLevel, fs,
+        new 
ExecPrefixPathFilter(AcidUtils.getMmFilePrefix(conf.getMmWriteId())));
     if (statuses == null) return;
     LinkedList<FileStatus> results = new LinkedList<>();
     List<Path> manifests = new ArrayList<>(statuses.length);

http://git-wip-us.apache.org/repos/asf/hive/blob/30fd19f4/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index e3646da..f2389ea 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
 import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.model.MMasterKey;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.DriverContext;
 import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
@@ -312,15 +313,28 @@ public class MoveTask extends Task<MoveWork> implements 
Serializable {
 
         checkFileFormats(db, tbd, table);
 
+        boolean isAcid = work.getLoadTableWork().getWriteType() != 
AcidUtils.Operation.NOT_ACID;
+        if (tbd.isMmTable()) {
+          if (tbd.getReplace()) {
+            // TODO#: would need a list of new files to support. Then, old 
ones only would need
+            //        to be removed from MS (and FS). Also, per-partition IOW 
is problematic for
+            //        the prefix case.
+            throw new HiveException("Replace and MM are not supported");
+          }
+          if (isAcid) {
+            // TODO# need to make sure ACID writes to final directories. 
Otherwise, might need to move.
+            throw new HiveException("ACID and MM are not supported");
+          }
+        }
+
         // Create a data container
         DataContainer dc = null;
         if (tbd.getPartitionSpec().size() == 0) {
           dc = new DataContainer(table.getTTable());
           Utilities.LOG14535.info("loadTable called from " + 
tbd.getSourcePath() + " into " + tbd.getTable());
           db.loadTable(tbd.getSourcePath(), tbd.getTable().getTableName(), 
tbd.getReplace(),
-              work.isSrcLocal(), isSkewedStoredAsDirs(tbd),
-              work.getLoadTableWork().getWriteType() != 
AcidUtils.Operation.NOT_ACID,
-              hasFollowingStatsTask());
+              work.isSrcLocal(), isSkewedStoredAsDirs(tbd), isAcid, 
hasFollowingStatsTask(),
+              tbd.getMmWriteId());
           if (work.getOutputs() != null) {
             work.getOutputs().add(new WriteEntity(table,
                 (tbd.getReplace() ? WriteEntity.WriteType.INSERT_OVERWRITE :
@@ -376,11 +390,13 @@ public class MoveTask extends Task<MoveWork> implements 
Serializable {
       TaskInformation ti) throws HiveException, IOException, 
InvalidOperationException {
     List<String> partVals = MetaStoreUtils.getPvals(table.getPartCols(),  
tbd.getPartitionSpec());
     db.validatePartitionNameCharacters(partVals);
-    Utilities.LOG14535.info("loadPartition called from " + tbd.getSourcePath() 
+ " into " + tbd.getTable());
-    db.loadPartition(tbd.getSourcePath(), tbd.getTable().getTableName(),
+    Utilities.LOG14535.info("loadPartition called from " + tbd.getSourcePath()
+        + " into " + tbd.getTable().getTableName());
+    db.loadSinglePartition(tbd.getSourcePath(), tbd.getTable().getTableName(),
         tbd.getPartitionSpec(), tbd.getReplace(),
         tbd.getInheritTableSpecs(), isSkewedStoredAsDirs(tbd), 
work.isSrcLocal(),
-        work.getLoadTableWork().getWriteType() != 
AcidUtils.Operation.NOT_ACID, hasFollowingStatsTask(), tbd.isMmTable());
+        work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID,
+        hasFollowingStatsTask(), tbd.getMmWriteId());
     Partition partn = db.getPartition(table, tbd.getPartitionSpec(), false);
 
     if (ti.bucketCols != null || ti.sortCols != null) {

http://git-wip-us.apache.org/repos/asf/hive/blob/30fd19f4/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index cda5f39..1ef15cb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -1082,6 +1082,12 @@ public class AcidUtils {
     return tableIsTransactional != null && 
tableIsTransactional.equalsIgnoreCase("true");
   }
 
+  public static boolean isMmTable(Table table) {
+    // TODO: perhaps it should be a 3rd value for 'transactional'?
+    String value = table.getProperty(hive_metastoreConstants.TABLE_IS_MM);
+    return value != null && value.equalsIgnoreCase("true");
+  }
+
   /**
    * Sets the acidOperationalProperties in the configuration object argument.
    * @param conf Mutable configuration object
@@ -1161,4 +1167,8 @@ public class AcidUtils {
     }
     return AcidOperationalProperties.parseString(resultStr);
   }
+
+  public static String getMmFilePrefix(long mmWriteId) {
+    return "mm_" + mmWriteId + "_";
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/30fd19f4/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java 
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index e43c600..5630392 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -51,6 +51,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.ConcurrentHashMap;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 
 import javax.jdo.JDODataStoreException;
@@ -1469,15 +1470,29 @@ public class Hive {
     return getDatabase(currentDb);
   }
 
-  public void loadPartition(Path loadPath, String tableName,
-      Map<String, String> partSpec, boolean replace,
-      boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir,
-      boolean isSrcLocal, boolean isAcid, boolean hasFollowingStatsTask, 
boolean isMmTable)
-          throws HiveException {
+  public void loadSinglePartition(Path loadPath, String tableName,
+      Map<String, String> partSpec, boolean replace, boolean inheritTableSpecs,
+      boolean isSkewedStoreAsSubdir,  boolean isSrcLocal, boolean isAcid,
+      boolean hasFollowingStatsTask, Long mmWriteId) throws HiveException {
     Table tbl = getTable(tableName);
-    // TODO# dbl check if table is still mm for consistency
+    boolean isMmTableWrite = (mmWriteId != null);
+    Preconditions.checkState(isMmTableWrite == AcidUtils.isMmTable(tbl));
     loadPartition(loadPath, tbl, partSpec, replace, inheritTableSpecs,
-        isSkewedStoreAsSubdir, isSrcLocal, isAcid, hasFollowingStatsTask, 
isMmTable);
+        isSkewedStoreAsSubdir, isSrcLocal, isAcid, hasFollowingStatsTask, 
mmWriteId);
+    if (isMmTableWrite) {
+      // The assumption behind committing here is that this partition is the 
only one outputted
+      commitMmTableWrite(tbl, mmWriteId);
+    }
+  }
+
+
+  private void commitMmTableWrite(Table tbl, Long mmWriteId)
+      throws HiveException {
+    try {
+      getMSC().finalizeTableWrite(tbl.getDbName(), tbl.getTableName(), 
mmWriteId, true);
+    } catch (TException e) {
+      throw new HiveException(e);
+    }
   }
 
   /**
@@ -1503,9 +1518,8 @@ public class Hive {
    */
   public Partition loadPartition(Path loadPath, Table tbl, Map<String, String> 
partSpec,
       boolean replace, boolean inheritTableSpecs, boolean 
isSkewedStoreAsSubdir,
-      boolean isSrcLocal, boolean isAcid, boolean hasFollowingStatsTask, 
boolean isMmTable)
+      boolean isSrcLocal, boolean isAcid, boolean hasFollowingStatsTask, Long 
mmWriteId)
           throws HiveException {
-
     Path tblDataLocationPath =  tbl.getDataLocation();
     try {
       Partition oldPart = getPartition(tbl, partSpec, false);
@@ -1542,12 +1556,12 @@ public class Hive {
       } else {
         newPartPath = oldPartPath;
       }
-      List<Path> newFiles = null, mmFiles = null;
-      if (isMmTable) {
-        mmFiles = handleMicromanagedPartition(
-            loadPath, tbl, replace, oldPart, newPartPath, isAcid);
+      List<Path> newFiles = null;
+      if (mmWriteId != null) {
+        Utilities.LOG14535.info("not moving " + loadPath + " to " + 
newPartPath);
+        assert !isAcid && !replace;
         if (areEventsForDmlNeeded(tbl, oldPart)) {
-          newFiles = mmFiles;
+          newFiles = listFilesCreatedByQuery(loadPath, mmWriteId);
         }
       } else {
         if (replace || (oldPart == null && !isAcid)) {
@@ -1636,21 +1650,9 @@ public class Hive {
     return conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary() 
&& oldPart != null;
   }
 
-
-  private List<Path> handleMicromanagedPartition(Path loadPath, Table tbl, 
boolean replace,
-      Partition oldPart, Path newPartPath, boolean isAcid) throws 
HiveException {
-    Utilities.LOG14535.info("not moving " + loadPath + " to " + newPartPath);
-    if (replace) {
-      // TODO#: would need a list of new files to support. Then, old ones only 
would need
-      //        to be removed from MS (and FS). Also, per-partition IOW is 
problematic for
-      //        the prefix case.
-      throw new HiveException("Replace and MM are not supported");
-    }
-    if (isAcid) {
-      // TODO# need to make sure ACID writes to final directories. Otherwise, 
might need to move.
-      throw new HiveException("ACID and MM are not supported");
-    }
+  private List<Path> listFilesCreatedByQuery(Path loadPath, long mmWriteId) 
throws HiveException {
     List<Path> newFiles = new ArrayList<Path>();
+    final String filePrefix = AcidUtils.getMmFilePrefix(mmWriteId);
     FileStatus[] srcs;
     FileSystem srcFs;
     try {
@@ -1664,19 +1666,27 @@ public class Hive {
       LOG.info("No sources specified: " + loadPath);
       return newFiles;
     }
-
+    PathFilter subdirFilter = null;
+ 
     // TODO: just like the move path, we only do one level of recursion.
     for (FileStatus src : srcs) {
       if (src.isDirectory()) {
+        if (subdirFilter == null) {
+          subdirFilter = new PathFilter() {
+            @Override
+            public boolean accept(Path path) {
+              return path.getName().startsWith(filePrefix);
+            }
+          };
+        }
         try {
-          for (FileStatus srcFile :
-            srcFs.listStatus(src.getPath(), 
FileUtils.HIDDEN_FILES_PATH_FILTER)) {
+          for (FileStatus srcFile : srcFs.listStatus(src.getPath(), 
subdirFilter)) {
             newFiles.add(srcFile.getPath());
           }
         } catch (IOException e) {
           throw new HiveException(e);
         }
-      } else {
+      } else if (src.getPath().getName().startsWith(filePrefix)) {
         newFiles.add(src.getPath());
       }
     }
@@ -1878,7 +1888,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
               Utilities.LOG14535.info("loadPartition called for DPP from " + 
partPath + " to " + tbl.getTableName());
               Partition newPartition = loadPartition(partPath, tbl, 
fullPartSpec,
                   replace, true, listBucketingEnabled,
-                  false, isAcid, hasFollowingStatsTask, false); // TODO# 
special case #N
+                  false, isAcid, hasFollowingStatsTask, null); // TODO# 
special case #N
               partitionsMap.put(fullPartSpec, newPartition);
 
               if (inPlaceEligible) {
@@ -1910,6 +1920,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
       for (Future future : futures) {
         future.get();
       }
+      // TODO# we would commit the txn to metastore here
     } catch (InterruptedException | ExecutionException e) {
       LOG.debug("Cancelling " + futures.size() + " dynamic loading tasks");
       //cancel other futures
@@ -1959,8 +1970,8 @@ private void constructOneLBLocationMap(FileStatus fSta,
    * @param isAcid true if this is an ACID based write
    */
   public void loadTable(Path loadPath, String tableName, boolean replace, 
boolean isSrcLocal,
-      boolean isSkewedStoreAsSubdir, boolean isAcid, boolean 
hasFollowingStatsTask)
-      throws HiveException {
+      boolean isSkewedStoreAsSubdir, boolean isAcid, boolean 
hasFollowingStatsTask,
+      Long mmWriteId) throws HiveException {
 
     List<Path> newFiles = null;
     Table tbl = getTable(tableName);
@@ -1968,17 +1979,21 @@ private void constructOneLBLocationMap(FileStatus fSta,
     if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary()) {
       newFiles = Collections.synchronizedList(new ArrayList<Path>());
     }
-    if (replace) {
-      Path tableDest = tbl.getPath();
-      replaceFiles(tableDest, loadPath, tableDest, tableDest, sessionConf, 
isSrcLocal);
-    } else {
-      FileSystem fs;
-      try {
-        fs = tbl.getDataLocation().getFileSystem(sessionConf);
-        copyFiles(sessionConf, loadPath, tbl.getPath(), fs, isSrcLocal, 
isAcid, newFiles);
-      } catch (IOException e) {
-        throw new HiveException("addFiles: filesystem error in check phase", 
e);
+    if (mmWriteId == null) {
+      if (replace) {
+        Path tableDest = tbl.getPath();
+        replaceFiles(tableDest, loadPath, tableDest, tableDest, sessionConf, 
isSrcLocal);
+      } else {
+        FileSystem fs;
+        try {
+          fs = tbl.getDataLocation().getFileSystem(sessionConf);
+          copyFiles(sessionConf, loadPath, tbl.getPath(), fs, isSrcLocal, 
isAcid, newFiles);
+        } catch (IOException e) {
+          throw new HiveException("addFiles: filesystem error in check phase", 
e);
+        }
       }
+    } else {
+      newFiles = listFilesCreatedByQuery(loadPath, mmWriteId);
     }
     if (!this.getConf().getBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER)) {
       StatsSetupConst.setBasicStatsState(tbl.getParameters(), 
StatsSetupConst.FALSE);
@@ -2012,6 +2027,10 @@ private void constructOneLBLocationMap(FileStatus fSta,
       throw new HiveException(e);
     }
 
+    if (mmWriteId != null) {
+      commitMmTableWrite(tbl, mmWriteId);
+    }
+
     fireInsertEvent(tbl, null, newFiles);
   }
 
@@ -3987,4 +4006,13 @@ private void constructOneLBLocationMap(FileStatus fSta,
       throw new HiveException(e);
     }
   }
+
+
+  public long getNextTableWriteId(String dbName, String tableName) throws 
HiveException {
+    try {
+      return getMSC().getNextTableWriteId(dbName, tableName);
+    } catch (Exception e) {
+      throw new HiveException(e);
+    }
+  }
 };

Reply via email to