Bei-z commented on a change in pull request #8542:
URL: https://github.com/apache/arrow/pull/8542#discussion_r516251296



##########
File path: cpp/src/arrow/util/basic_decimal.cc
##########
@@ -395,6 +395,34 @@ BasicDecimal128& BasicDecimal128::operator*=(const 
BasicDecimal128& right) {
   return *this;
 }
 
+/// Expands the given big endian array of uint64_t into an array of uint32_t.
+/// The value of input array is expected to be positive. The result_array will
+/// remove leading zeros from the input array.
+/// \param value_array an big endian array to represent the value
+/// \param result_array an array of length N*2 to set with the value
+/// \result the output length of the array
+template <size_t N>
+static int64_t FillInArray(std::array<uint64_t, N>& value_array, uint32_t* 
result_array) {
+  int64_t next_index = 0;
+  for (size_t i = 0; i < N; i++) {
+    if (value_array[i] == 0) {

Review comment:
       Changed accordingly, thanks!

##########
File path: cpp/src/arrow/util/basic_decimal.cc
##########
@@ -549,24 +570,28 @@ static DecimalStatus SingleDivide(const uint32_t* 
dividend, int64_t dividend_len
   return DecimalStatus::kSuccess;
 }
 
-DecimalStatus BasicDecimal128::Divide(const BasicDecimal128& divisor,
-                                      BasicDecimal128* result,
-                                      BasicDecimal128* remainder) const {
+/// \brief Do a division where the divisor fits into a single 32 bit value.
+template <class DecimalClass>
+static DecimalStatus DecimalDivide(const DecimalClass& dividend,
+                                   const DecimalClass& divisor, DecimalClass* 
result,
+                                   DecimalClass* remainder) {
+  static int64_t kDecimalArrayLength =
+      std::is_same<DecimalClass, BasicDecimal128>::value ? 4 : 8;

Review comment:
       Done, thx!

##########
File path: cpp/src/arrow/util/basic_decimal.cc
##########
@@ -490,49 +501,59 @@ static void FixDivisionSigns(BasicDecimal128* result, 
BasicDecimal128* remainder
   }
 }
 
-/// \brief Build a BasicDecimal128 from a list of ints.
+/// \brief Build a big endian array of uint64_t from a list of uint32_t.
+template <size_t N>
+static DecimalStatus BuildFromArray(std::array<uint64_t, N>* result_array,
+                                    uint32_t* array, int64_t length) {
+  for (int64_t i = length - 2 * N - 1; i >= 0; i--) {
+    if (array[i] != 0) {
+      return DecimalStatus::kOverflow;
+    }
+  }
+  int64_t next_index = length - 1;
+  for (int64_t i = N - 1; i >= 0; i--) {
+    uint64_t lower_bits = (next_index < 0) ? 0 : array[next_index--];
+    (*result_array)[i] =
+        (next_index < 0)
+            ? lower_bits
+            : ((static_cast<uint64_t>(array[next_index--]) << 32) + 
lower_bits);
+  }
+  return DecimalStatus::kSuccess;
+}
+
+/// \brief Build a BasicDecimal128 from a list of uint32_t.
 static DecimalStatus BuildFromArray(BasicDecimal128* value, uint32_t* array,
                                     int64_t length) {
-  switch (length) {
-    case 0:
-      *value = {static_cast<int64_t>(0)};
-      break;
-    case 1:
-      *value = {static_cast<int64_t>(array[0])};
-      break;
-    case 2:
-      *value = {static_cast<int64_t>(0),
-                (static_cast<uint64_t>(array[0]) << 32) + array[1]};
-      break;
-    case 3:
-      *value = {static_cast<int64_t>(array[0]),
-                (static_cast<uint64_t>(array[1]) << 32) + array[2]};
-      break;
-    case 4:
-      *value = {(static_cast<int64_t>(array[0]) << 32) + array[1],
-                (static_cast<uint64_t>(array[2]) << 32) + array[3]};
-      break;
-    case 5:
-      if (array[0] != 0) {
-        return DecimalStatus::kOverflow;
-      }
-      *value = {(static_cast<int64_t>(array[1]) << 32) + array[2],
-                (static_cast<uint64_t>(array[3]) << 32) + array[4]};
-      break;
-    default:
-      return DecimalStatus::kOverflow;
+  std::array<uint64_t, 2> result_array;
+  auto status = BuildFromArray(&result_array, array, length);
+  if (status != DecimalStatus::kSuccess) {
+    return status;
   }
+  *value = {static_cast<int64_t>(result_array[0]), result_array[1]};
+  return DecimalStatus::kSuccess;
+}
 
+/// \brief Build a BasicDecimal256 from a list of uint32_t.
+static DecimalStatus BuildFromArray(BasicDecimal256* value, uint32_t* array,
+                                    int64_t length) {
+  std::array<uint64_t, 4> result_array;
+  auto status = BuildFromArray(&result_array, array, length);
+  if (status != DecimalStatus::kSuccess) {
+    return status;
+  }
+  std::reverse(result_array.begin(), result_array.end());

Review comment:
       done. Thank you!

##########
File path: cpp/src/arrow/util/basic_decimal.cc
##########
@@ -395,6 +395,34 @@ BasicDecimal128& BasicDecimal128::operator*=(const 
BasicDecimal128& right) {
   return *this;
 }
 
+/// Expands the given big endian array of uint64_t into an array of uint32_t.
+/// The value of input array is expected to be positive. The result_array will
+/// remove leading zeros from the input array.
+/// \param value_array an big endian array to represent the value
+/// \param result_array an array of length N*2 to set with the value
+/// \result the output length of the array
+template <size_t N>
+static int64_t FillInArray(std::array<uint64_t, N>& value_array, uint32_t* 
result_array) {
+  int64_t next_index = 0;
+  for (size_t i = 0; i < N; i++) {
+    if (value_array[i] == 0) {
+      if (next_index != 0) {
+        result_array[next_index++] = 0;
+        result_array[next_index++] = 0;
+      }
+    } else if (value_array[i] <= std::numeric_limits<uint32_t>::max()) {

Review comment:
       This special case is needed here as we want to remove the leading zero 
in the result array of uint32_t, so that when the first non-zero uint64_t value 
is less than the max of uint32_t, we assign the value to the first element of 
the result array; when the non-zero uint64_t value is not the first value, we 
reserve the next element of the result array with 0;

##########
File path: cpp/src/arrow/util/basic_decimal.cc
##########
@@ -479,7 +489,8 @@ static void ShiftArrayRight(uint32_t* array, int64_t 
length, int64_t bits) {
 
 /// \brief Fix the signs of the result and remainder at the end of the 
division based on
 /// the signs of the dividend and divisor.
-static void FixDivisionSigns(BasicDecimal128* result, BasicDecimal128* 
remainder,
+template <class DecimalClass>
+static void FixDivisionSigns(DecimalClass* result, DecimalClass* remainder,

Review comment:
       Done. Thank you!

##########
File path: cpp/src/arrow/util/basic_decimal.cc
##########
@@ -549,24 +570,28 @@ static DecimalStatus SingleDivide(const uint32_t* 
dividend, int64_t dividend_len
   return DecimalStatus::kSuccess;
 }
 
-DecimalStatus BasicDecimal128::Divide(const BasicDecimal128& divisor,
-                                      BasicDecimal128* result,
-                                      BasicDecimal128* remainder) const {
+/// \brief Do a division where the divisor fits into a single 32 bit value.
+template <class DecimalClass>
+static DecimalStatus DecimalDivide(const DecimalClass& dividend,
+                                   const DecimalClass& divisor, DecimalClass* 
result,
+                                   DecimalClass* remainder) {
+  static int64_t kDecimalArrayLength =

Review comment:
       Done, thx!

##########
File path: cpp/src/arrow/util/basic_decimal.cc
##########
@@ -404,51 +432,33 @@ BasicDecimal128& BasicDecimal128::operator*=(const 
BasicDecimal128& right) {
 /// \result the output length of the array
 static int64_t FillInArray(const BasicDecimal128& value, uint32_t* array,
                            bool& was_negative) {
-  uint64_t high;
-  uint64_t low;
-  const int64_t highbits = value.high_bits();
-  const uint64_t lowbits = value.low_bits();
-
-  if (highbits < 0) {
-    low = ~lowbits + 1;
-    high = static_cast<uint64_t>(~highbits);
-    if (low == 0) {
-      ++high;
-    }
-    was_negative = true;
-  } else {
-    low = lowbits;
-    high = static_cast<uint64_t>(highbits);
-    was_negative = false;
-  }
-
-  if (high != 0) {
-    if (high > std::numeric_limits<uint32_t>::max()) {
-      array[0] = static_cast<uint32_t>(high >> 32);
-      array[1] = static_cast<uint32_t>(high);
-      array[2] = static_cast<uint32_t>(low >> 32);
-      array[3] = static_cast<uint32_t>(low);
-      return 4;
-    }
-
-    array[0] = static_cast<uint32_t>(high);
-    array[1] = static_cast<uint32_t>(low >> 32);
-    array[2] = static_cast<uint32_t>(low);
-    return 3;
-  }
-
-  if (low >= std::numeric_limits<uint32_t>::max()) {
-    array[0] = static_cast<uint32_t>(low >> 32);
-    array[1] = static_cast<uint32_t>(low);
-    return 2;
-  }
+  BasicDecimal128 abs_value = BasicDecimal128::Abs(value);
+  was_negative = value.Sign() < 0;

Review comment:
       Done. Thank you!

##########
File path: cpp/src/arrow/util/decimal_benchmark.cc
##########
@@ -158,6 +158,7 @@ static void BinaryMathOp256(benchmark::State& state) {  // 
NOLINT non-const refe
   for (auto _ : state) {
     for (int x = 0; x < kValueSize; x += 5) {
       benchmark::DoNotOptimize(v1[x + 2] * v2[x + 2]);
+      benchmark::DoNotOptimize(v1[x + 3] / v2[x + 3]);

Review comment:
       The change did make Decimal128 a little bit slower:
   Benchmark result after adding BasicDecimal256 Division support:
   BinaryMathOp128 164 ns 164 ns 4134767 items_per_second=61.0451M/s
   BinaryMathOp256 161 ns 161 ns 4197025 items_per_second=62.2226M/s
   
   Benchmark result before adding BasicDecimal256 Division support:
   BinaryMathOp128 153 ns 153 ns 4380604 items_per_second=65.4108M/s
   BinaryMathOp256 34.5 ns 34.5 ns 19945733 items_per_second=289.557M/s
   
   This metrics is also added to the pr description.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to