alamb commented on issue #12136:
URL: https://github.com/apache/datafusion/issues/12136#issuecomment-3298085828

   I just retried with DataFusion 50 and unfortuately the repdoducer it still 
fails the same way:
   
   > Error: Context("Not enough memory to continue external sort. Consider 
increasing the memory limit, or decreasing sort_spill_reservation_bytes", 
ResourcesExhausted("Failed to allocate additional 228.6 KB for 
ExternalSorter[0] with 0.0 B already allocated for this reservation - 0.0 B 
remain available for the total pool"))
   
   However, when I cranked the target partitions down to 2 and the memory limit 
to 50MB it did succeed and thus I believe this issue is now closed (there is 
some fixed amount of memory required per target partition but once that is 
satisfied, then the repo will work
   
   <details><summary>Updated</summary>
   <p>
   
   ```rust
   // DataFusion spilling sort benchmark / exmaples
   // Idea is to replicate a report from  
https://discord.com/channels/885562378132000778/1166447479609376850/1276137008435298335
   // where sort doesn't spill
   
   // Related link: sorting strings
   
   use datafusion::error::DataFusionError;
   use datafusion::prelude::*;
   
   use std::ops::Range;
   use std::sync::Arc;
   
   use arrow::array::{
       Decimal128Builder, Int32Builder, StringBuilder, StringDictionaryBuilder,
       TimestampNanosecondBuilder, UInt16Builder,
   };
   use arrow::datatypes::{DataType, Field, Int32Type, Schema, SchemaRef, 
TimeUnit};
   use arrow::record_batch::RecordBatch;
   use datafusion::execution::disk_manager::{DiskManagerBuilder, 
DiskManagerConfig};
   use datafusion::execution::memory_pool::FairSpillPool;
   use datafusion::execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder};
   use datafusion::execution::session_state::SessionStateBuilder;
   use rand::prelude::StdRng;
   use rand::{Rng, SeedableRng};
   
   #[tokio::main]
   async fn main() -> Result<(), DataFusionError> {
       // initialize logging to see DataFusion's internal logging
       std::env::set_var("RUST_LOG", "trace");
       env_logger::init();
   
       // how much data to sort
       let row_limit = 10 * 1000;
       let mem_limit = 10 * 1024 * 1024;
       let print_results = false;
   
       let pool = FairSpillPool::new(mem_limit);
   
       let runtime_env = RuntimeEnvBuilder::new()
           .with_memory_pool(Arc::new(pool))
           .with_disk_manager_builder(DiskManagerBuilder::default())
           .build()
           .unwrap();
   
       let builder = 
SessionStateBuilder::new().with_runtime_env(Arc::new(runtime_env));
   
       // TODO add docs to SessionContext with pointer to SessionStateBuilder
       let ctx = SessionContext::new_with_state(builder.build());
   
       let generator = AccessLogGenerator::new()
           .with_row_limit(row_limit)
           .with_max_batch_size(100); // 100 rows per batch
   
       // create a plan that simply sorts on the hostname
       let df = ctx
           .read_batches(generator)?
           .sort(vec![col("host").sort(true, true)])?;
   
       // execute the plan (it should succeed)
       let results: Vec<RecordBatch> = df.collect().await?;
   
       // format the results if interested
       // data looks like this
       // 
+---------+---------------------------------+---------------------------------+---------------------+---------------------------------------------------------------------------------------------+-------------------------------+-----------------+---------------------+-----------------------------------------------------------------------------------------------------+----------------+------------------------------+---------------+----------------+-----------------+---------------+
       // | service | host                            | pod                     
        | container           | image                                           
                                            | time                          | 
client_addr     | request_duration_ns | request_user_agent                      
                                                            | request_method | 
request_host                 | request_bytes | response_bytes | response_status 
| decimal_price |
       // 
+---------+---------------------------------+---------------------------------+---------------------+---------------------------------------------------------------------------------------------+-------------------------------+-----------------+---------------------+-----------------------------------------------------------------------------------------------------+----------------+------------------------------+---------------+----------------+-----------------+---------------+
       // | backend | i-1ec3ca3151468928.ec2.internal | 
aqcathnxqsphdhgjtgvxsfyiwbmhlmg | backend_container_0 | 
backend_container_0@sha256:30375999bf03beec2187843017b10c9e88d8b1a91615df4eb6350fb39472edd9
 | 1970-01-01T00:00:00           | 184.105.201.8   | 1783199433          | 
gzphhwlqcgngnumuphliejmxfdznuurswhdcicrlprbnocibvsbukiohjjbjdygwbfhxqvurmselkyowinarraxiousdxkct
    | DELETE         | https://backend.mydomain.com | 1323850416    | 
1217678432     | 200             | 1             |
       // | backend | i-1ec3ca3151468928.ec2.internal | 
aqcathnxqsphdhgjtgvxsfyiwbmhlmg | backend_container_0 | 
backend_container_0@sha256:30375999bf03beec2187843017b10c9e88d8b1a91615df4eb6350fb39472edd9
 | 1970-01-01T00:00:00.000001024 | 242.59.122.205  | -154463593          | 
sqlqphymcgqvfmsbjswsw                                                           
                    | HEAD           | https://backend.mydomain.com | 
1859293731    | -725493602     | 403             | 2             |
       // | backend | i-1ec3ca3151468928.ec2.internal | 
aqcathnxqsphdhgjtgvxsfyiwbmhlmg | backend_container_0 | 
backend_container_0@sha256:30375999bf03beec2187843017b10c9e88d8b1a91615df4eb6350fb39472edd9
 | 1970-01-01T00:00:00.000002048 | 15.86.76.43     | 401985389           | 
mqvzateabhlyavtdwrxoayfmajfplfb
       if print_results {
           let pretty_results = 
arrow::util::pretty::pretty_format_batches(&results)?.to_string();
           println!("{}", pretty_results);
       }
       Ok(())
   }
   
   /// From data_gen.rs in DataFusion
   
   // Licensed to the Apache Software Foundation (ASF) under one
   // or more contributor license agreements.  See the NOTICE file
   // distributed with this work for additional information
   // regarding copyright ownership.  The ASF licenses this file
   // to you under the Apache License, Version 2.0 (the
   // "License"); you may not use this file except in compliance
   // with the License.  You may obtain a copy of the License at
   //
   //   http://www.apache.org/licenses/LICENSE-2.0
   //
   // Unless required by applicable law or agreed to in writing,
   // software distributed under the License is distributed on an
   // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
   // KIND, either express or implied.  See the License for the
   // specific language governing permissions and limitations
   // under the License.
   
   #[derive(Debug, Clone)]
   struct GeneratorOptions {
       row_limit: usize,
       pods_per_host: Range<usize>,
       containers_per_pod: Range<usize>,
       entries_per_container: Range<usize>,
   }
   
   impl Default for GeneratorOptions {
       fn default() -> Self {
           Self {
               row_limit: usize::MAX,
               pods_per_host: 1..15,
               containers_per_pod: 1..3,
               entries_per_container: 1024..8192,
           }
       }
   }
   
   /// Creates access log like entries
   #[derive(Default)]
   struct BatchBuilder {
       service: StringDictionaryBuilder<Int32Type>,
       host: StringDictionaryBuilder<Int32Type>,
       pod: StringDictionaryBuilder<Int32Type>,
       container: StringDictionaryBuilder<Int32Type>,
       image: StringDictionaryBuilder<Int32Type>,
       time: TimestampNanosecondBuilder,
       client_addr: StringBuilder,
       request_duration: Int32Builder,
       request_user_agent: StringBuilder,
       request_method: StringBuilder,
       request_host: StringBuilder,
       request_bytes: Int32Builder,
       response_bytes: Int32Builder,
       response_status: UInt16Builder,
       prices_status: Decimal128Builder,
   
       options: GeneratorOptions,
       row_count: usize,
   }
   
   impl BatchBuilder {
       fn schema() -> SchemaRef {
           let utf8_dict =
               || DataType::Dictionary(Box::new(DataType::Int32), 
Box::new(DataType::Utf8));
   
           Arc::new(Schema::new(vec![
               Field::new("service", utf8_dict(), true),
               Field::new("host", utf8_dict(), false),
               Field::new("pod", utf8_dict(), false),
               Field::new("container", utf8_dict(), false),
               Field::new("image", utf8_dict(), false),
               Field::new(
                   "time",
                   DataType::Timestamp(TimeUnit::Nanosecond, None),
                   false,
               ),
               Field::new("client_addr", DataType::Utf8, true),
               Field::new("request_duration_ns", DataType::Int32, false),
               Field::new("request_user_agent", DataType::Utf8, true),
               Field::new("request_method", DataType::Utf8, true),
               Field::new("request_host", DataType::Utf8, true),
               Field::new("request_bytes", DataType::Int32, true),
               Field::new("response_bytes", DataType::Int32, true),
               Field::new("response_status", DataType::UInt16, false),
               Field::new("decimal_price", DataType::Decimal128(38, 0), false),
           ]))
       }
   
       fn is_finished(&self) -> bool {
           self.options.row_limit <= self.row_count
       }
   
       fn append(&mut self, rng: &mut StdRng, host: &str, service: &str) {
           let num_pods = rng.gen_range(self.options.pods_per_host.clone());
           let pods = generate_sorted_strings(rng, num_pods, 30..40);
           for pod in pods {
               let num_containers = 
rng.gen_range(self.options.containers_per_pod.clone());
               for container_idx in 0..num_containers {
                   let container = 
format!("{service}_container_{container_idx}");
                   let image = format!(
                       
"{container}@sha256:30375999bf03beec2187843017b10c9e88d8b1a91615df4eb6350fb39472edd9"
                   );
   
                   let num_entries = 
rng.gen_range(self.options.entries_per_container.clone());
                   for i in 0..num_entries {
                       if self.is_finished() {
                           return;
                       }
   
                       let time = i as i64 * 1024;
                       self.append_row(rng, host, &pod, service, &container, 
&image, time);
                   }
               }
           }
       }
   
       #[allow(clippy::too_many_arguments)]
       fn append_row(
           &mut self,
           rng: &mut StdRng,
           host: &str,
           pod: &str,
           service: &str,
           container: &str,
           image: &str,
           time: i64,
       ) {
           self.row_count += 1;
   
           let methods = &["GET", "PUT", "POST", "HEAD", "PATCH", "DELETE"];
           let status = &[200, 204, 400, 503, 403];
   
           self.service.append(service).unwrap();
           self.host.append(host).unwrap();
           self.pod.append(pod).unwrap();
           self.container.append(container).unwrap();
           self.image.append(image).unwrap();
           self.time.append_value(time);
   
           self.client_addr.append_value(format!(
               "{}.{}.{}.{}",
               rng.gen::<u8>(),
               rng.gen::<u8>(),
               rng.gen::<u8>(),
               rng.gen::<u8>()
           ));
           self.request_duration.append_value(rng.gen());
           self.request_user_agent
               .append_value(random_string(rng, 20..100));
           self.request_method
               .append_value(methods[rng.gen_range(0..methods.len())]);
           self.request_host
               .append_value(format!("https://{service}.mydomain.com";));
   
           self.request_bytes
               .append_option(rng.gen_bool(0.9).then(|| rng.gen()));
           self.response_bytes
               .append_option(rng.gen_bool(0.9).then(|| rng.gen()));
           self.response_status
               .append_value(status[rng.gen_range(0..status.len())]);
           self.prices_status.append_value(self.row_count as i128);
       }
   
       fn finish(mut self, schema: SchemaRef) -> RecordBatch {
           RecordBatch::try_new(
               schema,
               vec![
                   Arc::new(self.service.finish()),
                   Arc::new(self.host.finish()),
                   Arc::new(self.pod.finish()),
                   Arc::new(self.container.finish()),
                   Arc::new(self.image.finish()),
                   Arc::new(self.time.finish()),
                   Arc::new(self.client_addr.finish()),
                   Arc::new(self.request_duration.finish()),
                   Arc::new(self.request_user_agent.finish()),
                   Arc::new(self.request_method.finish()),
                   Arc::new(self.request_host.finish()),
                   Arc::new(self.request_bytes.finish()),
                   Arc::new(self.response_bytes.finish()),
                   Arc::new(self.response_status.finish()),
                   Arc::new(
                       self.prices_status
                           .finish()
                           .with_precision_and_scale(38, 0)
                           .unwrap(),
                   ),
               ],
           )
           .unwrap()
       }
   }
   
   fn random_string(rng: &mut StdRng, len_range: Range<usize>) -> String {
       let len = rng.gen_range(len_range);
       (0..len)
           .map(|_| rng.gen_range(b'a'..=b'z') as char)
           .collect::<String>()
   }
   
   fn generate_sorted_strings(rng: &mut StdRng, count: usize, str_len: 
Range<usize>) -> Vec<String> {
       let mut strings: Vec<_> = (0..count)
           .map(|_| random_string(rng, str_len.clone()))
           .collect();
   
       strings.sort_unstable();
       strings
   }
   
   /// Iterator that generates sorted, [`RecordBatch`]es with randomly 
generated data with
   /// an access log style schema for tracing or monitoring type
   /// usecases.
   ///
   /// This is useful for writing tests queries on such data
   ///
   /// Here are the columns with example data:
   ///
   /// ```text
   /// service:             'backend'
   /// host:                'i-1ec3ca3151468928.ec2.internal'
   /// pod:                 'aqcathnxqsphdhgjtgvxsfyiwbmhlmg'
   /// container:           'backend_container_0'
   /// image:               
'backend_container_0@sha256:30375999bf03beec2187843017b10c9e88d8b1a91615df4eb6350fb39472edd9'
   /// time:                '1970-01-01 00:00:00'
   /// client_addr:         '127.216.178.64'
   /// request_duration_ns: -1261239112
   /// request_user_agent:  
'kxttrfiiietlsaygzphhwlqcgngnumuphliejmxfdznuurswhdcicrlprbnocibvsbukiohjjbjdygwbfhxqvurm'
   /// request_method:      'PUT'
   /// request_host:        'https://backend.mydomain.com'
   /// request_bytes:       -312099516
   /// response_bytes:      1448834362
   /// response_status:     200
   /// ```
   #[derive(Debug)]
   pub struct AccessLogGenerator {
       schema: SchemaRef,
       rng: StdRng,
       host_idx: usize,
       /// maximum rows per batch
       max_batch_size: usize,
       /// How many rows have been returned so far
       row_count: usize,
       /// Options
       options: GeneratorOptions,
   }
   
   impl Default for AccessLogGenerator {
       fn default() -> Self {
           Self::new()
       }
   }
   
   impl AccessLogGenerator {
       pub fn new() -> Self {
           let seed = [
               1, 0, 0, 0, 23, 0, 3, 0, 200, 1, 0, 0, 210, 30, 8, 0, 1, 0, 21, 
0, 6, 0, 0, 0, 0, 0, 5,
               0, 0, 0, 0, 0,
           ];
   
           Self {
               schema: BatchBuilder::schema(),
               host_idx: 0,
               rng: StdRng::from_seed(seed),
               max_batch_size: usize::MAX,
               row_count: 0,
               options: Default::default(),
           }
       }
   
       /// Return the schema of the [`RecordBatch`]es  created
       pub fn schema(&self) -> SchemaRef {
           self.schema.clone()
       }
   
       /// Limit the maximum batch size
       pub fn with_max_batch_size(mut self, batch_size: usize) -> Self {
           self.max_batch_size = batch_size;
           self
       }
   
       /// Return up to row_limit rows;
       pub fn with_row_limit(mut self, row_limit: usize) -> Self {
           self.options.row_limit = row_limit;
           self
       }
   
       /// Set the number of pods per host
       pub fn with_pods_per_host(mut self, range: Range<usize>) -> Self {
           self.options.pods_per_host = range;
           self
       }
   
       /// Set the number of containers per pod
       pub fn with_containers_per_pod(mut self, range: Range<usize>) -> Self {
           self.options.containers_per_pod = range;
           self
       }
   
       /// Set the number of log entries per container
       pub fn with_entries_per_container(mut self, range: Range<usize>) -> Self 
{
           self.options.entries_per_container = range;
           self
       }
   }
   
   impl Iterator for AccessLogGenerator {
       type Item = RecordBatch;
   
       fn next(&mut self) -> Option<Self::Item> {
           if self.row_count == self.options.row_limit {
               return None;
           }
   
           let row_limit = self
               .max_batch_size
               .min(self.options.row_limit - self.row_count);
   
           let mut builder = BatchBuilder {
               options: GeneratorOptions {
                   row_limit,
                   ..self.options.clone()
               },
               ..Default::default()
           };
   
           let host = format!(
               "i-{:016x}.ec2.internal",
               self.host_idx * 0x7d87f8ed5c5 + 0x1ec3ca3151468928
           );
           self.host_idx += 1;
   
           for service in &["frontend", "backend", "database", "cache"] {
               if self.rng.gen_bool(0.5) {
                   continue;
               }
               if builder.is_finished() {
                   break;
               }
               builder.append(&mut self.rng, &host, service);
           }
   
           let batch = builder.finish(Arc::clone(&self.schema));
   
           self.row_count += batch.num_rows();
           Some(batch)
       }
   }
   
   ```
   
   </p>
   </details> 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to