benibus commented on code in PR #13516:
URL: https://github.com/apache/arrow/pull/13516#discussion_r974552546


##########
cpp/src/arrow/memory_pool.cc:
##########
@@ -667,8 +667,26 @@ MemoryPool* default_memory_pool() {
 
 #ifndef ARROW_JEMALLOC
 Status jemalloc_set_decay_ms(int ms) {
-  return Status::Invalid("jemalloc support is not built");
+  return Status::NotImplemented("jemalloc support is not built");
 }
+
+Result<int64_t> jemalloc_get_stat(const char* name) {
+  return Status::NotImplemented("jemalloc support is not built");
+}
+
+Status jemalloc_peak_reset() {
+  return Status::NotImplemented("jemalloc support is not built");
+}
+
+Status jemalloc_stats_print(void (*write_cb)(void*, const char*), void* 
cbopaque,
+                            const char* opts) {
+  return Status::NotImplemented("jemalloc support is not built");
+}
+
+Result<std::string> jemalloc_stats_print(const char* opts) {
+  return Status::NotImplemented("jemalloc support is not built");
+}

Review Comment:
   ```suggestion
   Status jemalloc_stats_print(const char* opts) {
     return Status::NotImplemented("jemalloc support is not built");
   }
   
   Status jemalloc_stats_print(std::function<void(const char*)> write_cb,
                               const char* opts) {
     return Status::NotImplemented("jemalloc support is not built");
   }
   
   Result<std::string> jemalloc_stats_string(const char* opts) {
     return Status::NotImplemented("jemalloc support is not built");
   }
   ```



##########
cpp/src/arrow/memory_pool.h:
##########
@@ -175,6 +177,31 @@ ARROW_EXPORT Status jemalloc_memory_pool(MemoryPool** out);
 ARROW_EXPORT
 Status jemalloc_set_decay_ms(int ms);
 
+/// \brief Get basic statistics from jemalloc's mallctl.
+/// See the MALLCTL NAMESPACE section in jemalloc project documentation for
+/// available stats.
+ARROW_EXPORT
+Result<int64_t> jemalloc_get_stat(const char* name);
+
+/// \brief Reset the counter for peak bytes allocated in the calling thread to 
zero.
+/// This affects subsequent calls to thread.peak.read, but not the values 
returned by
+/// thread.allocated or thread.deallocated.
+ARROW_EXPORT
+Status jemalloc_peak_reset();
+
+/// \brief Print summary statistics in human-readable form to stderr.
+/// See malloc_stats_print documentation in jemalloc project documentation for
+/// available opt flags.
+ARROW_EXPORT
+Status jemalloc_stats_print(void (*write_cb)(void*, const char*), void* 
cbopaque,
+                            const char* opts = "");
+
+/// \brief Get summary statistics in human-readable form.
+/// See malloc_stats_print documentation in jemalloc project documentation for
+/// available opt flags.
+ARROW_EXPORT
+Result<std::string> jemalloc_stats_print(const char* opts = "");

Review Comment:
   ```suggestion
   /// \brief Print summary statistics in human-readable form to stderr.
   /// See malloc_stats_print documentation in jemalloc project documentation 
for
   /// available opt flags.
   ARROW_EXPORT
   Status jemalloc_stats_print(const char* opts = "");
   
   /// \brief Print summary statistics in human-readable form using a callback
   /// See malloc_stats_print documentation in jemalloc project documentation 
for
   /// available opt flags.
   ARROW_EXPORT
   Status jemalloc_stats_print(std::function<void(const char*)> write_cb,
                               const char* opts = "");
   
   /// \brief Get summary statistics in human-readable form.
   /// See malloc_stats_print documentation in jemalloc project documentation 
for
   /// available opt flags.
   ARROW_EXPORT
   Result<std::string> jemalloc_stats_string(const char* opts = "");
   ```
   Only really introduced `jemalloc_stats_string` to differentiate it from the 
[proposed] default version of `jemalloc_stats_print`.



##########
cpp/src/arrow/memory_pool_jemalloc.cc:
##########
@@ -153,4 +154,52 @@ Status jemalloc_set_decay_ms(int ms) {
 
 #undef RETURN_IF_JEMALLOC_ERROR
 
+#ifdef ARROW_JEMALLOC
+Result<int64_t> jemalloc_get_stat(const char* name) {
+  size_t sz = sizeof(uint64_t);
+  int err;
+  uint64_t value;
+
+  // Update the statistics cached by mallctl.
+  if (std::strcmp(name, "stats.allocated") == 0 ||
+      std::strcmp(name, "stats.active") == 0 ||
+      std::strcmp(name, "stats.metadata") == 0 ||
+      std::strcmp(name, "stats.resident") == 0 ||
+      std::strcmp(name, "stats.mapped") == 0 ||
+      std::strcmp(name, "stats.retained") == 0) {
+    uint64_t epoch;
+    mallctl("epoch", &epoch, &sz, &epoch, sz);
+  }
+
+  err = mallctl(name, &value, &sz, nullptr, 0);
+
+  if (err) {
+    return arrow::internal::IOErrorFromErrno(err, "Failed retrieving ", &name);
+  }
+
+  return value;
+}
+
+Status jemalloc_peak_reset() {
+  int err = mallctl("thread.peak.reset", nullptr, nullptr, nullptr, 0);
+  return err ? arrow::internal::IOErrorFromErrno(err, "Failed resetting 
thread.peak.")
+             : Status::OK();
+}
+
+Result<std::string> jemalloc_stats_print(const char* opts) {
+  std::string stats;
+  auto write_cb = [](void* opaque, const char* str) {
+    reinterpret_cast<std::string*>(opaque)->append(str);
+  };
+  malloc_stats_print(write_cb, &stats, opts);
+  return stats;
+}
+
+Status jemalloc_stats_print(void (*write_cb)(void*, const char*), void* 
cbopaque,
+                            const char* opts) {
+  malloc_stats_print(write_cb, cbopaque, opts);
+  return Status::OK();
+}

Review Comment:
   ```suggestion
   Result<std::string> jemalloc_stats_string(const char* opts) {
     std::string stats;
     auto write_cb = [&stats](const char* str) { stats.append(str); };
     ARROW_UNUSED(jemalloc_stats_print(write_cb, opts));
     return stats;
   }
   
   Status jemalloc_stats_print(const char* opts) {
     malloc_stats_print(nullptr, nullptr, opts);
     return Status::OK();
   }
   
   Status jemalloc_stats_print(std::function<void(const char*)> write_cb,
                               const char* opts) {
     auto cb_wrapper = [](void* opaque, const char* str) {
       (*static_cast<std::function<void(const char*)>*>(opaque))(str);
     };
     if (write_cb) {
       malloc_stats_print(cb_wrapper, &write_cb, opts);
     }
     return Status::OK();
   }
   ```
   You can't pass a `std::function` as a function pointer since it's actually a 
class that holds a "callable object" (which may or may not be an actual 
function pointer). Instead, you can provide a wrapper callback that invokes it 
manually.



##########
cpp/src/arrow/memory_pool_test.cc:
##########
@@ -168,7 +169,107 @@ TEST(Jemalloc, SetDirtyPageDecayMillis) {
 #ifdef ARROW_JEMALLOC
   ASSERT_OK(jemalloc_set_decay_ms(0));
 #else
-  ASSERT_RAISES(Invalid, jemalloc_set_decay_ms(0));
+  ASSERT_RAISES(NotImplemented, jemalloc_set_decay_ms(0));
+#endif
+}
+
+TEST(Jemalloc, GetAllocationStats) {
+#ifdef ARROW_JEMALLOC
+  uint8_t* data;
+  int64_t allocated, active, metadata, resident, mapped, retained, allocated0, 
active0,
+      metadata0, resident0, mapped0, retained0;
+  int64_t thread_allocated, thread_deallocated, thread_peak_read, 
thread_allocated0,
+      thread_deallocated0, thread_peak_read0;
+  auto pool = default_memory_pool();
+  ABORT_NOT_OK(jemalloc_memory_pool(&pool));
+  ASSERT_EQ("jemalloc", pool->backend_name());
+
+  // Record stats before allocating
+  ASSERT_OK_AND_ASSIGN(allocated0, jemalloc_get_stat("stats.allocated"));
+  ASSERT_OK_AND_ASSIGN(active0, jemalloc_get_stat("stats.active"));
+  ASSERT_OK_AND_ASSIGN(metadata0, jemalloc_get_stat("stats.metadata"));
+  ASSERT_OK_AND_ASSIGN(resident0, jemalloc_get_stat("stats.resident"));
+  ASSERT_OK_AND_ASSIGN(mapped0, jemalloc_get_stat("stats.mapped"));
+  ASSERT_OK_AND_ASSIGN(retained0, jemalloc_get_stat("stats.retained"));
+  ASSERT_OK_AND_ASSIGN(thread_allocated0, 
jemalloc_get_stat("thread.allocated"));
+  ASSERT_OK_AND_ASSIGN(thread_deallocated0, 
jemalloc_get_stat("thread.deallocated"));
+  ASSERT_OK_AND_ASSIGN(thread_peak_read0, 
jemalloc_get_stat("thread.peak.read"));
+
+  // Allocate memory
+  ASSERT_OK(jemalloc_set_decay_ms(10000));
+  ASSERT_OK(pool->Allocate(1025, &data));
+  ASSERT_EQ(pool->bytes_allocated(), 1025);
+  ASSERT_OK(pool->Reallocate(1025, 1023, &data));
+  ASSERT_EQ(pool->bytes_allocated(), 1023);
+
+  // Record stats after allocating
+  ASSERT_OK_AND_ASSIGN(allocated, jemalloc_get_stat("stats.allocated"));
+  ASSERT_OK_AND_ASSIGN(active, jemalloc_get_stat("stats.active"));
+  ASSERT_OK_AND_ASSIGN(metadata, jemalloc_get_stat("stats.metadata"));
+  ASSERT_OK_AND_ASSIGN(resident, jemalloc_get_stat("stats.resident"));
+  ASSERT_OK_AND_ASSIGN(mapped, jemalloc_get_stat("stats.mapped"));
+  ASSERT_OK_AND_ASSIGN(retained, jemalloc_get_stat("stats.retained"));
+  ASSERT_OK_AND_ASSIGN(thread_allocated, 
jemalloc_get_stat("thread.allocated"));
+  ASSERT_OK_AND_ASSIGN(thread_deallocated, 
jemalloc_get_stat("thread.deallocated"));
+  ASSERT_OK_AND_ASSIGN(thread_peak_read, 
jemalloc_get_stat("thread.peak.read"));
+
+  // Check allocated stats pre-allocation
+  ASSERT_NEAR(allocated0, 120000, 100000);
+  ASSERT_NEAR(active0, 75000, 70000);
+  ASSERT_NEAR(metadata0, 3000000, 1000000);
+  ASSERT_NEAR(resident0, 3000000, 1000000);
+  ASSERT_NEAR(mapped0, 6500000, 1000000);
+  ASSERT_NEAR(retained0, 1500000, 2000000);
+
+  // Check allocated stats change due to allocation
+  ASSERT_NEAR(allocated - allocated0, 70000, 50000);
+  ASSERT_NEAR(active - active0, 100000, 90000);
+  ASSERT_NEAR(metadata - metadata0, 500, 460);
+  ASSERT_NEAR(resident - resident0, 120000, 110000);
+  ASSERT_NEAR(mapped - mapped0, 100000, 90000);
+  ASSERT_NEAR(retained - retained0, 0, 40000);
+
+  ASSERT_NEAR(thread_peak_read - thread_peak_read0, 1024, 700);
+  ASSERT_NEAR(thread_allocated - thread_allocated0, 2500, 500);
+  ASSERT_EQ(thread_deallocated - thread_deallocated0, 1280);
+
+  // Resetting thread peak read metric
+  ASSERT_OK(pool->Allocate(12560, &data));
+  ASSERT_OK_AND_ASSIGN(thread_peak_read, 
jemalloc_get_stat("thread.peak.read"));
+  ASSERT_NEAR(thread_peak_read, 15616, 1000);
+  ASSERT_OK(jemalloc_peak_reset());
+  ASSERT_OK(pool->Allocate(1256, &data));
+  ASSERT_OK_AND_ASSIGN(thread_peak_read, 
jemalloc_get_stat("thread.peak.read"));
+  ASSERT_NEAR(thread_peak_read, 1280, 100);
+
+  // Print statistics to stderr
+  ASSERT_OK(jemalloc_stats_print(nullptr, nullptr, "J"));
+
+  // Read statistics into std::string
+  ASSERT_OK_AND_ASSIGN(std::string stats, jemalloc_stats_print("Jax"));
+
+  // Read statistics into std::string with a lambda
+  std::string stats2;
+  auto write_cb = [](void* opaque, const char* str) {
+    reinterpret_cast<std::string*>(opaque)->append(str);
+  };
+  ASSERT_OK(jemalloc_stats_print(write_cb, &stats2, "Jax"));
+
+  ASSERT_EQ(stats.rfind("{\"jemalloc\":{\"version\"", 0), 0);
+  ASSERT_EQ(stats2.rfind("{\"jemalloc\":{\"version\"", 0), 0);
+  ASSERT_EQ(stats.substr(0, 100), stats2.substr(0, 100));
+#else
+  std::string stats;
+  auto write_cb = [](void* opaque, const char* str) {
+    reinterpret_cast<std::string*>(opaque)->append(str);
+  };
+  ASSERT_RAISES(NotImplemented, jemalloc_get_stat("thread.peak.read"));
+  ASSERT_RAISES(NotImplemented, jemalloc_get_stat("stats.allocated"));
+  ASSERT_RAISES(NotImplemented, jemalloc_get_stat("stats.allocated"));
+  ASSERT_RAISES(NotImplemented, jemalloc_get_stat("stats.allocatedp"));
+  ASSERT_RAISES(NotImplemented, jemalloc_peak_reset());
+  ASSERT_RAISES(NotImplemented, jemalloc_stats_print(write_cb, &stats, "Jax"));

Review Comment:
   ```suggestion
     ASSERT_OK(jemalloc_stats_print("J"));
   
     // Read statistics into std::string
     ASSERT_OK_AND_ASSIGN(std::string stats, jemalloc_stats_string("Jax"));
   
     // Read statistics into std::string with a lambda
     std::string stats2;
     auto write_cb = [&stats2](const char* str) { stats2.append(str); };
     ASSERT_OK(jemalloc_stats_print(write_cb, "Jax"));
   
     ASSERT_EQ(stats.rfind("{\"jemalloc\":{\"version\"", 0), 0);
     ASSERT_EQ(stats2.rfind("{\"jemalloc\":{\"version\"", 0), 0);
     ASSERT_EQ(stats.substr(0, 100), stats2.substr(0, 100));
   #else
     std::string stats;
     auto write_cb = [&stats](const char* str) { stats.append(str); };
     ASSERT_RAISES(NotImplemented, jemalloc_get_stat("thread.peak.read"));
     ASSERT_RAISES(NotImplemented, jemalloc_get_stat("stats.allocated"));
     ASSERT_RAISES(NotImplemented, jemalloc_get_stat("stats.allocated"));
     ASSERT_RAISES(NotImplemented, jemalloc_get_stat("stats.allocatedp"));
     ASSERT_RAISES(NotImplemented, jemalloc_peak_reset());
     ASSERT_RAISES(NotImplemented, jemalloc_stats_print(write_cb, "Jax"));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to