Shant Hovsepian created IMPALA-10236:
----------------------------------------

             Summary: Queries Stuck if catalog topic update compression fails
                 Key: IMPALA-10236
                 URL: https://issues.apache.org/jira/browse/IMPALA-10236
             Project: IMPALA
          Issue Type: Bug
          Components: Catalog
    Affects Versions: Impala 2.12.0
            Reporter: Shant Hovsepian


If a compressed Catalog Object doesn't fit into a 2GB buffer, an error is 
thrown. 

 
{code:java}
/// Compresses a serialized catalog object using LZ4 and stores it back in 
'dst'. Stores
/// the size of the uncompressed catalog object in the first sizeof(uint32_t) 
bytes of
/// 'dst'. The compression fails if the uncompressed data size exceeds 
0x7E000000 bytes.
Status CompressCatalogObject(const uint8_t* src, uint32_t size, std::string* 
dst)
    WARN_UNUSED_RESULT;

{code}
 

CatalogServer::AddPendingTopicItem() calls CompressCatalogObject()

 
{code:java}
// Add a catalog update to pending_topic_updates_.
extern "C"
JNIEXPORT jboolean JNICALL
Java_org_apache_impala_service_FeSupport_NativeAddPendingTopicItem(JNIEnv* env,
    jclass caller_class, jlong native_catalog_server_ptr, jstring key, jlong 
version,
    jbyteArray serialized_object, jboolean deleted) {
  std::string key_string;
  {
    JniUtfCharGuard key_str;
    if (!JniUtfCharGuard::create(env, key, &key_str).ok()) {
      return static_cast<jboolean>(false);
    }
    key_string.assign(key_str.get());
  }
  JniScopedArrayCritical obj_buf;
  if (!JniScopedArrayCritical::Create(env, serialized_object, &obj_buf)) {
    return static_cast<jboolean>(false);
  }
  reinterpret_cast<CatalogServer*>(native_catalog_server_ptr)->
      AddPendingTopicItem(std::move(key_string), version, obj_buf.get(),
      static_cast<uint32_t>(obj_buf.size()), deleted);
  return static_cast<jboolean>(true);
}

{code}
However the JNI call to AddPendingTopicItem disregards the return value.

Recently the return value was maintained due to IMPALA-10076:
{code:java}
-        if (!FeSupport.NativeAddPendingTopicItem(nativeCatalogServerPtr, v1Key,
-            obj.catalog_version, data, delete)) {
+        int actualSize = 
FeSupport.NativeAddPendingTopicItem(nativeCatalogServerPtr,
+            v1Key, obj.catalog_version, data, delete);
+        if (actualSize < 0) {
           LOG.error("NativeAddPendingTopicItem failed in BE. key=" + v1Key + 
", delete="
               + delete + ", data_size=" + data.length);
+        } else if (summary != null && obj.type == HDFS_PARTITION) {
+          summary.update(true, delete, obj.hdfs_partition.partition_name,
+              obj.catalog_version, data.length, actualSize);
         }
       }
{code}
CatalogServiceCatalog::addCatalogObject() produces an error message but the 
Catalog update doesn't go through.
{code:java}
      if (topicMode_ == TopicMode.FULL || topicMode_ == TopicMode.MIXED) {
        String v1Key = CatalogServiceConstants.CATALOG_TOPIC_V1_PREFIX + key;
        byte[] data = serializer.serialize(obj);
        int actualSize = 
FeSupport.NativeAddPendingTopicItem(nativeCatalogServerPtr,
            v1Key, obj.catalog_version, data, delete);
        if (actualSize < 0) {
          LOG.error("NativeAddPendingTopicItem failed in BE. key=" + v1Key + ", 
delete="
              + delete + ", data_size=" + data.length);
        } else if (summary != null && obj.type == HDFS_PARTITION) {
          summary.update(true, delete, obj.hdfs_partition.partition_name,
              obj.catalog_version, data.length, actualSize);
        }
      }

{code}
Not sure what the right behavior would be, we could handle the compression 
issue and try more aggressive compression, or unblock the catalog update.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-all-unsubscr...@impala.apache.org
For additional commands, e-mail: issues-all-h...@impala.apache.org

Reply via email to