Re: [C++] Thread deadlock in ObjectOutputStream

2024-05-29 Thread Li Jin
Hi Antoine,

Thank much for the reply! I did create an GitHub issue yesterday.

#41862

On Wed, May 29, 2024 at 10:24 AM Antoine Pitrou  wrote:

>
> Hi Li!
>
> Sorry for the delay.
> It seems the problem lies here:
>
> https://github.com/apache/arrow/blob/9f5899019d23b2b1eae2fedb9f6be8827885d843/cpp/src/arrow/filesystem/s3fs.cc#L1858
>
> The Future is marked finished with the ObjectOutputStream's mutex taken,
> and the Future's callback then triggers a chain of event which leads to
> calling the ObjectOutputStream destructor, which in turn tries to take
> the lock.
>
> Can you open a GH issue and we can follow up there?
>
> Regards
>
> Antoine.
>
>
> Le 23/05/2024 à 21:23, Li Jin a écrit :
> > Hello,
> >
> > I am seeing a deadlock when destructing an ObjectOutputStream. I have
> > attached the stack trace.
> >
> > I did some debugging and found that the issue seems to be that the mutex
> > in question is already held by this thread (I checked the __owner field
> > in the pthread_mutex_t which points to the hanging thread)
> >
> > Unfortunately the stack trace doesn’t show exactly which mutex it is
> > trying to lock. I wonder if someone more familiar with the IO code has
> > some ideas what might be the issue and where to dig deeper?
> >
> > Appreciate the help,
> > Li
> >
> >
> >
>


[C++] Thread deadlock in ObjectOutputStream

2024-05-23 Thread Li Jin
Hello,

I am seeing a deadlock when destructing an ObjectOutputStream. I have
attached the stack trace.

I did some debugging and found that the issue seems to be that the mutex in
question is already held by this thread (I checked the __owner field in the
pthread_mutex_t which points to the hanging thread)

Unfortunately the stack trace doesn’t show exactly which mutex it is trying
to lock. I wonder if someone more familiar with the IO code has some ideas
what might be the issue and where to dig deeper?

Appreciate the help,
Li
Thread 39 (Thread 0xe2199eee700 (LWP 1392) "python3.10"):
#0  __lll_lock_wait (futex=futex@entry=0xe2158016c60, private=0) at 
lowlevellock.c:52
#1  0x0e223fe14843 in __GI___pthread_mutex_lock (mutex=0xe2158016c60) at 
../nptl/pthread_mutex_lock.c:80
#2  0x0e223a4c7be3 in virtual thunk to arrow::fs::(anonymous 
namespace)::ObjectOutputStream::Close() () at 
/build/build/ext/public/apache/arrow/15/0/0/apache-arrow/cpp/src/arrow/status.h:140
#3  0x0e223993eaef in arrow::io::internal::CloseFromDestructor 
(file=file@entry=0xe2158005c80) at 
/build/build/ext/public/apache/arrow/15/0/0/apache-arrow/cpp/src/arrow/io/interfaces.cc:284
#4  0x0e223a4b9250 in arrow::fs::(anonymous 
namespace)::ObjectOutputStream::~ObjectOutputStream (this=0xe2158005b40, 
__in_chrg=, __vtt_parm=) at 
/build/build/ext/public/apache/arrow/15/0/0/apache-arrow/cpp/src/arrow/filesystem/s3fs.cc:1398
#5  __gnu_cxx::new_allocator::destroy (__p=0xe2158005b40, this=0xe2158005b40) at 
/build/build/ext/public/gpl3/gnu/gcc/11/dist/include/c++/11.3.0/ext/new_allocator.h:168
#6  std::allocator_traits >::destroy (__p=0xe2158005b40, __a=...) at 
/build/build/ext/public/gpl3/gnu/gcc/11/dist/include/c++/11.3.0/bits/alloc_traits.h:535
#7  std::_Sp_counted_ptr_inplace, (__gnu_cxx::_Lock_policy)2>::_M_dispose 
(this=0xe2158005b30) at 
/build/build/ext/public/gpl3/gnu/gcc/11/dist/include/c++/11.3.0/bits/shared_ptr_base.h:528
#8  0x0e223c41ddda in 
std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release 
(this=0xe2158005b30) at 
/build/build/ext/public/gpl3/gnu/gcc/11/dist/include/c++/11.3.0/bits/shared_ptr_base.h:168
#9  0x0e223bbb62a8 in 
std::__shared_count<(__gnu_cxx::_Lock_policy)2>::~__shared_count 
(this=0xe21580f2b40, __in_chrg=) at 
/tmp/build/ext/public/gpl3/gnu/gcc/11/dist/include/c++/11.3.0/bits/shared_ptr_base.h:705
#10 std::__shared_ptr::~__shared_ptr (this=0xe21580f2b38, 
__in_chrg=) at 
/tmp/build/ext/public/gpl3/gnu/gcc/11/dist/include/c++/11.3.0/bits/shared_ptr_base.h:1154
#11 std::shared_ptr::~shared_ptr (this=0xe21580f2b38, 
__in_chrg=) at 
/tmp/build/ext/public/gpl3/gnu/gcc/11/dist/include/c++/11.3.0/bits/shared_ptr.h:122
#12 arrow::dataset::FileWriter::~FileWriter (this=0xe21580f2b10, 
__in_chrg=) at 
/tmp/build/ts/arrow/dataset/c/src/arrow/dataset/file_base.h:378
#13 arrow::dataset::ParquetFileWriter::~ParquetFileWriter (this=0xe21580f2b10, 
__in_chrg=) at 
/tmp/build/ts/arrow/dataset/c/src/arrow/dataset/file_parquet.h:282
#14 arrow::dataset::ParquetFileWriter::~ParquetFileWriter (this=0xe21580f2b10, 
__in_chrg=) at 
/tmp/build/ts/arrow/dataset/c/src/arrow/dataset/file_parquet.h:282
#15 0x0e223c41ddda in 
std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release 
(this=0xe21580b9a10) at 
/build/build/ext/public/gpl3/gnu/gcc/11/dist/include/c++/11.3.0/bits/shared_ptr_base.h:168
#16 0x0e223bae3f1c in 
std::__shared_count<(__gnu_cxx::_Lock_policy)2>::~__shared_count 
(this=0x5b665b834690, __in_chrg=) at 
/tmp/build/ext/public/gpl3/gnu/gcc/11/dist/include/c++/11.3.0/bits/shared_ptr_base.h:705
#17 std::__shared_ptr::~__shared_ptr (this=0x5b665b834688, 
__in_chrg=) at 
/tmp/build/ext/public/gpl3/gnu/gcc/11/dist/include/c++/11.3.0/bits/shared_ptr_base.h:1154
#18 std::shared_ptr::~shared_ptr 
(this=0x5b665b834688, __in_chrg=) at 
/tmp/build/ext/public/gpl3/gnu/gcc/11/dist/include/c++/11.3.0/bits/shared_ptr.h:122
#19 arrow::dataset::internal::(anonymous 
namespace)::DatasetWriterFileQueue::~DatasetWriterFileQueue 
(this=0x5b665b834670, __in_chrg=) at 
/tmp/build/ts/arrow/dataset/c/src/arrow/dataset/dataset_writer.cc:140
#20 std::default_delete::operator() (__ptr=0x5b665b834670, 
this=) at 
/tmp/build/ext/public/gpl3/gnu/gcc/11/dist/include/c++/11.3.0/bits/unique_ptr.h:85
#21 std::unique_ptr >::~unique_ptr (this=0x5b665b833800, 
__in_chrg=) at 
/tmp/build/ext/public/gpl3/gnu/gcc/11/dist/include/c++/11.3.0/bits/unique_ptr.h:361
#22 ~ (this=0x5b665b8337f8, __in_chrg=) at 
/tmp/build/ts/arrow/dataset/c/src/arrow/dataset/dataset_writer.cc:364
#23 
arrow::internal::FnOnce::FnImpl >::~FnImpl (this=0x5b665b8337f0, __in_chrg=) at 
/tmp/build/ext/public/apache/arrow/15/0/0/dist/include/arrow/util/functional.h:150
#24 
arrow::internal::FnOnce::FnImpl >::~FnImpl(void) (this=0x5b665b8337f0, 
__in_chrg=) at 
/tmp/build/ext/public/apache/arrow/15/0/0/dist/include/arrow/util/functional.h:150
#25 0x0e223995cd6e in 
std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release 
(this=0x5b665

Re: Arrow 15 parquet nanosecond change

2024-02-21 Thread Li Jin
Thank you!

Will give this a shot.

On Wed, Feb 21, 2024 at 5:10 PM Dane Pitkin 
wrote:

> It is possible to change the default Parquet version when instantiating
> PyArrow's ParquetWriter[1]. Here's the PR[2] that upgraded the default
> Parquet format version from 2.4 -> 2.6, which contains nanosecond support.
> It was released in Arrow v13.
>
> [1]
>
> https://github.com/apache/arrow/blob/e198f309c577de9a265c04af2bc4644c33f54375/python/pyarrow/parquet/core.py#L953
>
> [2]https://github.com/apache/arrow/pull/36137
>
> On Wed, Feb 21, 2024 at 4:15 PM Li Jin  wrote:
>
> > “Exponentially exposed” -> “potentially exposed”
> >
> > On Wed, Feb 21, 2024 at 4:13 PM Li Jin  wrote:
> >
> > > Thanks - since we don’t control all the invocation of pq.write_table, I
> > > wonder if there is some configuration for the “default” behavior?
> > >
> > > Also I wonder if there are other API surface that is exponentially
> > exposed
> > > to this, e.g., dataset or pd.Dataframe.to_parquet ?
> > >
> > > Thanks!
> > > Li
> > >
> > > On Wed, Feb 21, 2024 at 3:53 PM Jacek Pliszka  >
> > > wrote:
> > >
> > >> Hi!
> > >>
> > >> pq.write_table(
> > >> table, config.output_filename, coerce_timestamps="us",
> > >> allow_truncated_timestamps=True,
> > >> )
> > >>
> > >> allows you to write as us instead of ns.
> > >>
> > >> BR
> > >>
> > >> J
> > >>
> > >>
> > >> śr., 21 lut 2024 o 21:44 Li Jin  napisał(a):
> > >>
> > >> > Hi,
> > >> >
> > >> > My colleague has informed me that during the Arrow 12->15 upgrade,
> he
> > >> found
> > >> > that writing a pandas Dataframe with datetime64[ns] to parquet will
> > >> result
> > >> > in nanosecond metadata and nanosecond values.
> > >> >
> > >> > I wonder if this is something configurable to the old behavior so we
> > can
> > >> > enable “nanosecond in parquet” gradually? There are code that reads
> > >> parquet
> > >> > files that don’t handle parquet nanosecond now.
> > >> >
> > >> > Thanks!
> > >> > Li
> > >> >
> > >>
> > >
> >
>


Re: Arrow 15 parquet nanosecond change

2024-02-21 Thread Li Jin
“Exponentially exposed” -> “potentially exposed”

On Wed, Feb 21, 2024 at 4:13 PM Li Jin  wrote:

> Thanks - since we don’t control all the invocation of pq.write_table, I
> wonder if there is some configuration for the “default” behavior?
>
> Also I wonder if there are other API surface that is exponentially exposed
> to this, e.g., dataset or pd.Dataframe.to_parquet ?
>
> Thanks!
> Li
>
> On Wed, Feb 21, 2024 at 3:53 PM Jacek Pliszka 
> wrote:
>
>> Hi!
>>
>> pq.write_table(
>> table, config.output_filename, coerce_timestamps="us",
>> allow_truncated_timestamps=True,
>> )
>>
>> allows you to write as us instead of ns.
>>
>> BR
>>
>> J
>>
>>
>> śr., 21 lut 2024 o 21:44 Li Jin  napisał(a):
>>
>> > Hi,
>> >
>> > My colleague has informed me that during the Arrow 12->15 upgrade, he
>> found
>> > that writing a pandas Dataframe with datetime64[ns] to parquet will
>> result
>> > in nanosecond metadata and nanosecond values.
>> >
>> > I wonder if this is something configurable to the old behavior so we can
>> > enable “nanosecond in parquet” gradually? There are code that reads
>> parquet
>> > files that don’t handle parquet nanosecond now.
>> >
>> > Thanks!
>> > Li
>> >
>>
>


Re: Arrow 15 parquet nanosecond change

2024-02-21 Thread Li Jin
Thanks - since we don’t control all the invocation of pq.write_table, I
wonder if there is some configuration for the “default” behavior?

Also I wonder if there are other API surface that is exponentially exposed
to this, e.g., dataset or pd.Dataframe.to_parquet ?

Thanks!
Li

On Wed, Feb 21, 2024 at 3:53 PM Jacek Pliszka 
wrote:

> Hi!
>
> pq.write_table(
> table, config.output_filename, coerce_timestamps="us",
> allow_truncated_timestamps=True,
> )
>
> allows you to write as us instead of ns.
>
> BR
>
> J
>
>
> śr., 21 lut 2024 o 21:44 Li Jin  napisał(a):
>
> > Hi,
> >
> > My colleague has informed me that during the Arrow 12->15 upgrade, he
> found
> > that writing a pandas Dataframe with datetime64[ns] to parquet will
> result
> > in nanosecond metadata and nanosecond values.
> >
> > I wonder if this is something configurable to the old behavior so we can
> > enable “nanosecond in parquet” gradually? There are code that reads
> parquet
> > files that don’t handle parquet nanosecond now.
> >
> > Thanks!
> > Li
> >
>


Arrow 15 parquet nanosecond change

2024-02-21 Thread Li Jin
Hi,

My colleague has informed me that during the Arrow 12->15 upgrade, he found
that writing a pandas Dataframe with datetime64[ns] to parquet will result
in nanosecond metadata and nanosecond values.

I wonder if this is something configurable to the old behavior so we can
enable “nanosecond in parquet” gradually? There are code that reads parquet
files that don’t handle parquet nanosecond now.

Thanks!
Li


Re: [ANNOUNCE] New Arrow PMC chair: Andy Grove

2023-11-28 Thread Li Jin
 Congrats Andy!

On Tue, Nov 28, 2023 at 3:25 PM Weston Pace  wrote:

> Congrats Andy!
>
> On Mon, Nov 27, 2023, 7:31 PM wish maple  wrote:
>
> > Congrats Andy!
> >
> > Best,
> > Xuwei Fu
> >
> > Andrew Lamb  于2023年11月27日周一 20:47写道:
> >
> > > I am pleased to announce that the Arrow Project has a new PMC chair and
> > VP
> > > as per our tradition of rotating the chair once a year. I have resigned
> > and
> > > Andy Grove was duly elected by the PMC and approved unanimously by the
> > > board.
> > >
> > > Please join me in congratulating Andy Grove!
> > >
> > > Thanks,
> > > Andrew
> > >
> >
>


Re: C++: Code that read parquet into Arrow Arrays?

2023-11-19 Thread Li Jin
Ty!

On Fri, Nov 17, 2023 at 9:12 PM wish maple  wrote:

> Hi,
>
> The parquet is divided into arrow and parquet part.
>
> 1. The parquet part lowest position is parquet decoder, in [1].
> The float point might choosing PLAIN, RLE_DCIT or BYTE_STREAM_SPLIT
> encoding.
> 2. parquet::ColumnReader is applied beyond decoder, each row-group might
> have
> one or two ( if choosing dictionary encoding and fall-back to plain,
> there're
> two encoding in a RowGroup for a column). This is in [2]
>
> Other modules are mentioned by Bryce.
>
> Best,
> Xuwei Fu
>
> [1] https://github.com/apache/arrow/blob/main/cpp/src/parquet/encoding.cc
> [2]
> https://github.com/apache/arrow/blob/main/cpp/src/parquet/column_reader.cc
>
> Li Jin  于2023年11月18日周六 05:27写道:
>
> > Hi,
> >
> > I am recently investigating a null/nan issue with Parquet and Arrow and
> > wonder if someone can give me a pointer to the code that decodes Parquet
> > row group into Arrow float/double arrays?
> >
> > Thanks,
> > Li
> >
>


Re: C++: Code that read parquet into Arrow Arrays?

2023-11-17 Thread Li Jin
Thank you!

On Fri, Nov 17, 2023 at 5:40 PM Bryce Mecum  wrote:

> Hi Li, I think what you're after is ColumnReaderImpl::NextBatch [1]
> which looks like it eventually calls TransferZeroCopy [2] in the case
> of primitive types like float/double (amongst others).
>
> [1]
> https://github.com/apache/arrow/blob/main/cpp/src/parquet/arrow/reader.cc#L107
> [2]
> https://github.com/apache/arrow/blob/main/cpp/src/parquet/arrow/reader_internal.cc#L345
>
> On Fri, Nov 17, 2023 at 12:27 PM Li Jin  wrote:
> >
> > Hi,
> >
> > I am recently investigating a null/nan issue with Parquet and Arrow and
> > wonder if someone can give me a pointer to the code that decodes Parquet
> > row group into Arrow float/double arrays?
> >
> > Thanks,
> > Li
>


C++: Code that read parquet into Arrow Arrays?

2023-11-17 Thread Li Jin
Hi,

I am recently investigating a null/nan issue with Parquet and Arrow and
wonder if someone can give me a pointer to the code that decodes Parquet
row group into Arrow float/double arrays?

Thanks,
Li


Re: [C++] Potential cache/memory leak when reading parquet

2023-09-08 Thread Li Jin
Sorry I realized my previous email might have the wrong format. Resending
with correct format.

Update:

I have done a memory profiling and the result seems to suggest memory leak.
I
 have opened a issue to further discuss this:
https://github.com/apache/arrow/issues/37630



On Fri, Sep 8, 2023 at 10:07 AM Li Jin  wrote:

> Update:
>
> I have done a memory profiling and the result seems to suggest memory
> leak. I
>  have opened a issue to further discuss this:
> https://github.com/apache/arrow/issues/37630
>
>
> On Fri, Sep 8, 2023 at 10:04 AM Li Jin  wrote:
>
>> Update:
>>
>> I have done a memory profiling and the result seems to suggest memory
>> leak. I
>>  have opened a issue to further discuss this:
>> https://github.com/apache/arrow/issues/37630
>>
>> Attaching the memory profiling result here as well:
>>
>> On Wed, Sep 6, 2023 at 9:18 PM Gang Wu  wrote:
>>
>>> As suggested from other comments, I also highly recommend using a
>>> heap profiling tool to investigate what's going on there.
>>>
>>> BTW, 800 columns look suspicious to me. Could you try to test them
>>> without reading any batch? Not sure if the file metadata is the root
>>> cause. Or you may want to try another dataset with a smaller number
>>> of columns.
>>>
>>> On Thu, Sep 7, 2023 at 5:45 AM Li Jin  wrote:
>>>
>>> > Correction:
>>> >
>>> > > I tried with both Antione's suggestions (swapping the default
>>> allocator
>>> > and calls ReleaseUnused but neither seem to affect the max rss.
>>> >
>>> > Calling ReleaseUnused does have some effect on the rss - the max rss
>>> goes
>>> > from ~6G -> 5G but still there seems to be something else.
>>> >
>>> > On Wed, Sep 6, 2023 at 4:35 PM Li Jin  wrote:
>>> >
>>> > > Also attaching my experiment code just in case:
>>> > > https://gist.github.com/icexelloss/88195de046962e1d043c99d96e1b8b43
>>> > >
>>> > > On Wed, Sep 6, 2023 at 4:29 PM Li Jin  wrote:
>>> > >
>>> > >> Reporting back with some new findings.
>>> > >>
>>> > >> Re Felipe and Antione:
>>> > >> I tried with both Antione's suggestions (swapping the default
>>> allocator
>>> > >> and calls ReleaseUnused but neither seem to affect the max rss. In
>>> > >> addition, I manage to repro the issue by reading a list of n local
>>> > parquet
>>> > >> files that point to the same file, i.e., {"a.parquet", "a.parquet",
>>> ...
>>> > }.
>>> > >> I am also able to crash my process by reading and passing a large
>>> > enough n.
>>> > >> (I observed rss keep going up and eventually the process gets
>>> killed).
>>> > This
>>> > >> observation led me to think there might actually be some memory leak
>>> > issues.
>>> > >>
>>> > >> Re Xuwei:
>>> > >> Thanks for the tips. I am gonna try to memorize this profile next
>>> and
>>> > see
>>> > >> what I can find.
>>> > >>
>>> > >> I am gonna keep looking into this but again, any ideas /
>>> suggestions are
>>> > >> appreciated (and thanks for all the help so far!)
>>> > >>
>>> > >> Li
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >> On Wed, Sep 6, 2023 at 1:59 PM Li Jin 
>>> wrote:
>>> > >>
>>> > >>> Thanks all for the additional suggestions. Will try it but want to
>>> > >>> answer Antoine's question first:
>>> > >>>
>>> > >>> > Which leads to the question: what is your OS?
>>> > >>>
>>> > >>> I am testing this on Debian 5.4.228 x86_64 GNU/Linux
>>> > >>>
>>> > >>> On Wed, Sep 6, 2023 at 1:31 PM wish maple 
>>> > >>> wrote:
>>> > >>>
>>> > >>>> By the way, you can try to use a memory-profiler like [1] and [2]
>>> .
>>> > >>>> It would be help to find how the memory is used
>>> > >>>>
>>> > >>>> Best,
>>> > >

Re: [C++] Potential cache/memory leak when reading parquet

2023-09-08 Thread Li Jin
Update:

I have done a memory profiling and the result seems to suggest memory leak.
I
 have opened a issue to further discuss this:
https://github.com/apache/arrow/issues/37630


On Fri, Sep 8, 2023 at 10:04 AM Li Jin  wrote:

> Update:
>
> I have done a memory profiling and the result seems to suggest memory
> leak. I
>  have opened a issue to further discuss this:
> https://github.com/apache/arrow/issues/37630
>
> Attaching the memory profiling result here as well:
>
> On Wed, Sep 6, 2023 at 9:18 PM Gang Wu  wrote:
>
>> As suggested from other comments, I also highly recommend using a
>> heap profiling tool to investigate what's going on there.
>>
>> BTW, 800 columns look suspicious to me. Could you try to test them
>> without reading any batch? Not sure if the file metadata is the root
>> cause. Or you may want to try another dataset with a smaller number
>> of columns.
>>
>> On Thu, Sep 7, 2023 at 5:45 AM Li Jin  wrote:
>>
>> > Correction:
>> >
>> > > I tried with both Antione's suggestions (swapping the default
>> allocator
>> > and calls ReleaseUnused but neither seem to affect the max rss.
>> >
>> > Calling ReleaseUnused does have some effect on the rss - the max rss
>> goes
>> > from ~6G -> 5G but still there seems to be something else.
>> >
>> > On Wed, Sep 6, 2023 at 4:35 PM Li Jin  wrote:
>> >
>> > > Also attaching my experiment code just in case:
>> > > https://gist.github.com/icexelloss/88195de046962e1d043c99d96e1b8b43
>> > >
>> > > On Wed, Sep 6, 2023 at 4:29 PM Li Jin  wrote:
>> > >
>> > >> Reporting back with some new findings.
>> > >>
>> > >> Re Felipe and Antione:
>> > >> I tried with both Antione's suggestions (swapping the default
>> allocator
>> > >> and calls ReleaseUnused but neither seem to affect the max rss. In
>> > >> addition, I manage to repro the issue by reading a list of n local
>> > parquet
>> > >> files that point to the same file, i.e., {"a.parquet", "a.parquet",
>> ...
>> > }.
>> > >> I am also able to crash my process by reading and passing a large
>> > enough n.
>> > >> (I observed rss keep going up and eventually the process gets
>> killed).
>> > This
>> > >> observation led me to think there might actually be some memory leak
>> > issues.
>> > >>
>> > >> Re Xuwei:
>> > >> Thanks for the tips. I am gonna try to memorize this profile next and
>> > see
>> > >> what I can find.
>> > >>
>> > >> I am gonna keep looking into this but again, any ideas / suggestions
>> are
>> > >> appreciated (and thanks for all the help so far!)
>> > >>
>> > >> Li
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >> On Wed, Sep 6, 2023 at 1:59 PM Li Jin  wrote:
>> > >>
>> > >>> Thanks all for the additional suggestions. Will try it but want to
>> > >>> answer Antoine's question first:
>> > >>>
>> > >>> > Which leads to the question: what is your OS?
>> > >>>
>> > >>> I am testing this on Debian 5.4.228 x86_64 GNU/Linux
>> > >>>
>> > >>> On Wed, Sep 6, 2023 at 1:31 PM wish maple 
>> > >>> wrote:
>> > >>>
>> > >>>> By the way, you can try to use a memory-profiler like [1] and [2] .
>> > >>>> It would be help to find how the memory is used
>> > >>>>
>> > >>>> Best,
>> > >>>> Xuwei Fu
>> > >>>>
>> > >>>> [1]
>> > >>>>
>> https://github.com/jemalloc/jemalloc/wiki/Use-Case%3A-Heap-Profiling
>> > >>>> [2] https://google.github.io/tcmalloc/gperftools.html
>> > >>>>
>> > >>>>
>> > >>>> Felipe Oliveira Carvalho  于2023年9月7日周四
>> 00:28写道:
>> > >>>>
>> > >>>> > > (a) stays pretty stable throughout the scan (stays < 1G), (b)
>> > keeps
>> > >>>> > increasing during the scan (looks linear to the number of files
>> > >>>> scanned).
>> > >>>> >
>> > >>

Re: [C++] Potential cache/memory leak when reading parquet

2023-09-06 Thread Li Jin
Correction:

> I tried with both Antione's suggestions (swapping the default allocator
and calls ReleaseUnused but neither seem to affect the max rss.

Calling ReleaseUnused does have some effect on the rss - the max rss goes
from ~6G -> 5G but still there seems to be something else.

On Wed, Sep 6, 2023 at 4:35 PM Li Jin  wrote:

> Also attaching my experiment code just in case:
> https://gist.github.com/icexelloss/88195de046962e1d043c99d96e1b8b43
>
> On Wed, Sep 6, 2023 at 4:29 PM Li Jin  wrote:
>
>> Reporting back with some new findings.
>>
>> Re Felipe and Antione:
>> I tried with both Antione's suggestions (swapping the default allocator
>> and calls ReleaseUnused but neither seem to affect the max rss. In
>> addition, I manage to repro the issue by reading a list of n local parquet
>> files that point to the same file, i.e., {"a.parquet", "a.parquet", ... }.
>> I am also able to crash my process by reading and passing a large enough n.
>> (I observed rss keep going up and eventually the process gets killed). This
>> observation led me to think there might actually be some memory leak issues.
>>
>> Re Xuwei:
>> Thanks for the tips. I am gonna try to memorize this profile next and see
>> what I can find.
>>
>> I am gonna keep looking into this but again, any ideas / suggestions are
>> appreciated (and thanks for all the help so far!)
>>
>> Li
>>
>>
>>
>>
>>
>>
>> On Wed, Sep 6, 2023 at 1:59 PM Li Jin  wrote:
>>
>>> Thanks all for the additional suggestions. Will try it but want to
>>> answer Antoine's question first:
>>>
>>> > Which leads to the question: what is your OS?
>>>
>>> I am testing this on Debian 5.4.228 x86_64 GNU/Linux
>>>
>>> On Wed, Sep 6, 2023 at 1:31 PM wish maple 
>>> wrote:
>>>
>>>> By the way, you can try to use a memory-profiler like [1] and [2] .
>>>> It would be help to find how the memory is used
>>>>
>>>> Best,
>>>> Xuwei Fu
>>>>
>>>> [1]
>>>> https://github.com/jemalloc/jemalloc/wiki/Use-Case%3A-Heap-Profiling
>>>> [2] https://google.github.io/tcmalloc/gperftools.html
>>>>
>>>>
>>>> Felipe Oliveira Carvalho  于2023年9月7日周四 00:28写道:
>>>>
>>>> > > (a) stays pretty stable throughout the scan (stays < 1G), (b) keeps
>>>> > increasing during the scan (looks linear to the number of files
>>>> scanned).
>>>> >
>>>> > I wouldn't take this to mean a memory leak but the memory allocator
>>>> not
>>>> > paging out virtual memory that has been allocated throughout the scan.
>>>> > Could you run your workload under a memory profiler?
>>>> >
>>>> > (3) Scan the same dataset twice in the same process doesn't increase
>>>> the
>>>> > max rss.
>>>> >
>>>> > Another sign this isn't a leak, just the allocator reaching a level of
>>>> > memory commitment that it doesn't feel like undoing.
>>>> >
>>>> > --
>>>> > Felipe
>>>> >
>>>> > On Wed, Sep 6, 2023 at 12:56 PM Li Jin  wrote:
>>>> >
>>>> > > Hello,
>>>> > >
>>>> > > I have been testing "What is the max rss needed to scan through
>>>> ~100G of
>>>> > > data in a parquet stored in gcs using Arrow C++".
>>>> > >
>>>> > > The current answer is about ~6G of memory which seems a bit high so
>>>> I
>>>> > > looked into it. What I observed during the process led me to think
>>>> that
>>>> > > there are some potential cache/memory issues in the dataset/parquet
>>>> cpp
>>>> > > code.
>>>> > >
>>>> > > Main observation:
>>>> > > (1) As I am scanning through the dataset, I printed out (a) memory
>>>> > > allocated by the memory pool from ScanOptions (b) process rss. I
>>>> found
>>>> > that
>>>> > > while (a) stays pretty stable throughout the scan (stays < 1G), (b)
>>>> keeps
>>>> > > increasing during the scan (looks linear to the number of files
>>>> scanned).
>>>> > > (2) I tested ScanNode in Arrow as well as an in-house library that
>>>> > > implements its own "S3Dataset" similar to Arrow dataset, both
>>>> showing
>>>> > > similar rss usage. (Which led me to think the issue is more likely
>>>> to be
>>>> > in
>>>> > > the parquet cpp code instead of dataset code).
>>>> > > (3) Scan the same dataset twice in the same process doesn't
>>>> increase the
>>>> > > max rss.
>>>> > >
>>>> > > I plan to look into the parquet cpp/dataset code but I wonder if
>>>> someone
>>>> > > has some clues what the issue might be or where to look at?
>>>> > >
>>>> > > Thanks,
>>>> > > Li
>>>> > >
>>>> >
>>>>
>>>


Re: [C++] Potential cache/memory leak when reading parquet

2023-09-06 Thread Li Jin
Also attaching my experiment code just in case:
https://gist.github.com/icexelloss/88195de046962e1d043c99d96e1b8b43

On Wed, Sep 6, 2023 at 4:29 PM Li Jin  wrote:

> Reporting back with some new findings.
>
> Re Felipe and Antione:
> I tried with both Antione's suggestions (swapping the default allocator
> and calls ReleaseUnused but neither seem to affect the max rss. In
> addition, I manage to repro the issue by reading a list of n local parquet
> files that point to the same file, i.e., {"a.parquet", "a.parquet", ... }.
> I am also able to crash my process by reading and passing a large enough n.
> (I observed rss keep going up and eventually the process gets killed). This
> observation led me to think there might actually be some memory leak issues.
>
> Re Xuwei:
> Thanks for the tips. I am gonna try to memorize this profile next and see
> what I can find.
>
> I am gonna keep looking into this but again, any ideas / suggestions are
> appreciated (and thanks for all the help so far!)
>
> Li
>
>
>
>
>
>
> On Wed, Sep 6, 2023 at 1:59 PM Li Jin  wrote:
>
>> Thanks all for the additional suggestions. Will try it but want to answer
>> Antoine's question first:
>>
>> > Which leads to the question: what is your OS?
>>
>> I am testing this on Debian 5.4.228 x86_64 GNU/Linux
>>
>> On Wed, Sep 6, 2023 at 1:31 PM wish maple  wrote:
>>
>>> By the way, you can try to use a memory-profiler like [1] and [2] .
>>> It would be help to find how the memory is used
>>>
>>> Best,
>>> Xuwei Fu
>>>
>>> [1] https://github.com/jemalloc/jemalloc/wiki/Use-Case%3A-Heap-Profiling
>>> [2] https://google.github.io/tcmalloc/gperftools.html
>>>
>>>
>>> Felipe Oliveira Carvalho  于2023年9月7日周四 00:28写道:
>>>
>>> > > (a) stays pretty stable throughout the scan (stays < 1G), (b) keeps
>>> > increasing during the scan (looks linear to the number of files
>>> scanned).
>>> >
>>> > I wouldn't take this to mean a memory leak but the memory allocator not
>>> > paging out virtual memory that has been allocated throughout the scan.
>>> > Could you run your workload under a memory profiler?
>>> >
>>> > (3) Scan the same dataset twice in the same process doesn't increase
>>> the
>>> > max rss.
>>> >
>>> > Another sign this isn't a leak, just the allocator reaching a level of
>>> > memory commitment that it doesn't feel like undoing.
>>> >
>>> > --
>>> > Felipe
>>> >
>>> > On Wed, Sep 6, 2023 at 12:56 PM Li Jin  wrote:
>>> >
>>> > > Hello,
>>> > >
>>> > > I have been testing "What is the max rss needed to scan through
>>> ~100G of
>>> > > data in a parquet stored in gcs using Arrow C++".
>>> > >
>>> > > The current answer is about ~6G of memory which seems a bit high so I
>>> > > looked into it. What I observed during the process led me to think
>>> that
>>> > > there are some potential cache/memory issues in the dataset/parquet
>>> cpp
>>> > > code.
>>> > >
>>> > > Main observation:
>>> > > (1) As I am scanning through the dataset, I printed out (a) memory
>>> > > allocated by the memory pool from ScanOptions (b) process rss. I
>>> found
>>> > that
>>> > > while (a) stays pretty stable throughout the scan (stays < 1G), (b)
>>> keeps
>>> > > increasing during the scan (looks linear to the number of files
>>> scanned).
>>> > > (2) I tested ScanNode in Arrow as well as an in-house library that
>>> > > implements its own "S3Dataset" similar to Arrow dataset, both showing
>>> > > similar rss usage. (Which led me to think the issue is more likely
>>> to be
>>> > in
>>> > > the parquet cpp code instead of dataset code).
>>> > > (3) Scan the same dataset twice in the same process doesn't increase
>>> the
>>> > > max rss.
>>> > >
>>> > > I plan to look into the parquet cpp/dataset code but I wonder if
>>> someone
>>> > > has some clues what the issue might be or where to look at?
>>> > >
>>> > > Thanks,
>>> > > Li
>>> > >
>>> >
>>>
>>


Re: [C++] Potential cache/memory leak when reading parquet

2023-09-06 Thread Li Jin
Reporting back with some new findings.

Re Felipe and Antione:
I tried with both Antione's suggestions (swapping the default allocator and
calls ReleaseUnused but neither seem to affect the max rss. In addition, I
manage to repro the issue by reading a list of n local parquet files that
point to the same file, i.e., {"a.parquet", "a.parquet", ... }. I am also
able to crash my process by reading and passing a large enough n. (I
observed rss keep going up and eventually the process gets killed). This
observation led me to think there might actually be some memory leak issues.

Re Xuwei:
Thanks for the tips. I am gonna try to memorize this profile next and see
what I can find.

I am gonna keep looking into this but again, any ideas / suggestions are
appreciated (and thanks for all the help so far!)

Li






On Wed, Sep 6, 2023 at 1:59 PM Li Jin  wrote:

> Thanks all for the additional suggestions. Will try it but want to answer
> Antoine's question first:
>
> > Which leads to the question: what is your OS?
>
> I am testing this on Debian 5.4.228 x86_64 GNU/Linux
>
> On Wed, Sep 6, 2023 at 1:31 PM wish maple  wrote:
>
>> By the way, you can try to use a memory-profiler like [1] and [2] .
>> It would be help to find how the memory is used
>>
>> Best,
>> Xuwei Fu
>>
>> [1] https://github.com/jemalloc/jemalloc/wiki/Use-Case%3A-Heap-Profiling
>> [2] https://google.github.io/tcmalloc/gperftools.html
>>
>>
>> Felipe Oliveira Carvalho  于2023年9月7日周四 00:28写道:
>>
>> > > (a) stays pretty stable throughout the scan (stays < 1G), (b) keeps
>> > increasing during the scan (looks linear to the number of files
>> scanned).
>> >
>> > I wouldn't take this to mean a memory leak but the memory allocator not
>> > paging out virtual memory that has been allocated throughout the scan.
>> > Could you run your workload under a memory profiler?
>> >
>> > (3) Scan the same dataset twice in the same process doesn't increase the
>> > max rss.
>> >
>> > Another sign this isn't a leak, just the allocator reaching a level of
>> > memory commitment that it doesn't feel like undoing.
>> >
>> > --
>> > Felipe
>> >
>> > On Wed, Sep 6, 2023 at 12:56 PM Li Jin  wrote:
>> >
>> > > Hello,
>> > >
>> > > I have been testing "What is the max rss needed to scan through ~100G
>> of
>> > > data in a parquet stored in gcs using Arrow C++".
>> > >
>> > > The current answer is about ~6G of memory which seems a bit high so I
>> > > looked into it. What I observed during the process led me to think
>> that
>> > > there are some potential cache/memory issues in the dataset/parquet
>> cpp
>> > > code.
>> > >
>> > > Main observation:
>> > > (1) As I am scanning through the dataset, I printed out (a) memory
>> > > allocated by the memory pool from ScanOptions (b) process rss. I found
>> > that
>> > > while (a) stays pretty stable throughout the scan (stays < 1G), (b)
>> keeps
>> > > increasing during the scan (looks linear to the number of files
>> scanned).
>> > > (2) I tested ScanNode in Arrow as well as an in-house library that
>> > > implements its own "S3Dataset" similar to Arrow dataset, both showing
>> > > similar rss usage. (Which led me to think the issue is more likely to
>> be
>> > in
>> > > the parquet cpp code instead of dataset code).
>> > > (3) Scan the same dataset twice in the same process doesn't increase
>> the
>> > > max rss.
>> > >
>> > > I plan to look into the parquet cpp/dataset code but I wonder if
>> someone
>> > > has some clues what the issue might be or where to look at?
>> > >
>> > > Thanks,
>> > > Li
>> > >
>> >
>>
>


Re: [C++] Potential cache/memory leak when reading parquet

2023-09-06 Thread Li Jin
Thanks all for the additional suggestions. Will try it but want to answer
Antoine's question first:

> Which leads to the question: what is your OS?

I am testing this on Debian 5.4.228 x86_64 GNU/Linux

On Wed, Sep 6, 2023 at 1:31 PM wish maple  wrote:

> By the way, you can try to use a memory-profiler like [1] and [2] .
> It would be help to find how the memory is used
>
> Best,
> Xuwei Fu
>
> [1] https://github.com/jemalloc/jemalloc/wiki/Use-Case%3A-Heap-Profiling
> [2] https://google.github.io/tcmalloc/gperftools.html
>
>
> Felipe Oliveira Carvalho  于2023年9月7日周四 00:28写道:
>
> > > (a) stays pretty stable throughout the scan (stays < 1G), (b) keeps
> > increasing during the scan (looks linear to the number of files scanned).
> >
> > I wouldn't take this to mean a memory leak but the memory allocator not
> > paging out virtual memory that has been allocated throughout the scan.
> > Could you run your workload under a memory profiler?
> >
> > (3) Scan the same dataset twice in the same process doesn't increase the
> > max rss.
> >
> > Another sign this isn't a leak, just the allocator reaching a level of
> > memory commitment that it doesn't feel like undoing.
> >
> > --
> > Felipe
> >
> > On Wed, Sep 6, 2023 at 12:56 PM Li Jin  wrote:
> >
> > > Hello,
> > >
> > > I have been testing "What is the max rss needed to scan through ~100G
> of
> > > data in a parquet stored in gcs using Arrow C++".
> > >
> > > The current answer is about ~6G of memory which seems a bit high so I
> > > looked into it. What I observed during the process led me to think that
> > > there are some potential cache/memory issues in the dataset/parquet cpp
> > > code.
> > >
> > > Main observation:
> > > (1) As I am scanning through the dataset, I printed out (a) memory
> > > allocated by the memory pool from ScanOptions (b) process rss. I found
> > that
> > > while (a) stays pretty stable throughout the scan (stays < 1G), (b)
> keeps
> > > increasing during the scan (looks linear to the number of files
> scanned).
> > > (2) I tested ScanNode in Arrow as well as an in-house library that
> > > implements its own "S3Dataset" similar to Arrow dataset, both showing
> > > similar rss usage. (Which led me to think the issue is more likely to
> be
> > in
> > > the parquet cpp code instead of dataset code).
> > > (3) Scan the same dataset twice in the same process doesn't increase
> the
> > > max rss.
> > >
> > > I plan to look into the parquet cpp/dataset code but I wonder if
> someone
> > > has some clues what the issue might be or where to look at?
> > >
> > > Thanks,
> > > Li
> > >
> >
>


Re: [C++] Potential cache/memory leak when reading parquet

2023-09-06 Thread Li Jin
> Hi Jin,
> Do you have more information about the parquet file?

This is metadata for one file (I scanned about 2000 files  in total)



  created_by: parquet-mr version 1.12.3 (build
f8dced182c4c1fbdec6ccb3185537b5a01e6ed6b)

  num_columns: 840

  num_rows: 87382

  num_row_groups: 1

  format_version: 1.0

  serialized_size: 247053


On Wed, Sep 6, 2023 at 12:22 PM wish maple  wrote:

> 1. In dataset, it might have `fragment_readahead` or other.
> 2. In Parquet, if prebuffer is enabled, it will prebuffer some column ( See
> `FileReaderImpl::GetRecordBatchReader`)
> 3. In Parquet, if non-buffered read is enabled, when read a column, the
> whole ColumChunk would be read.
> Otherwise, it will "buffered" read it decided by buffer-size
>
> Maybe I forgot someplaces. You can try to check that.
>
> Best
> Xuwei Fu
>
> Li Jin  于2023年9月7日周四 00:16写道:
>
> > Thanks both for the quick response! I wonder if there is some code in
> > parquet cpp  that might be keeping some cached information (perhaps
> > metadata) per file scanned?
> >
> > On Wed, Sep 6, 2023 at 12:10 PM wish maple 
> wrote:
> >
> > > I've met lots of Parquet Dataset issues. The main problem is that
> > currently
> > > we have 2 sets or API
> > > and they have different scan-options. And sometimes different
> interfaces
> > > like `to_batches()` or
> > > others would enable different scan options.
> > >
> > > I think [2] is similar to your problem. 1-4 are some issues I met
> before.
> > >
> > > As for the code, you may take a look at :
> > > 1. ParquetFileFormat and Dataset related.
> > > 2. FileSystem and CacheRange. Parquet might use this to handle
> pre-buffer
> > > 3. How Parquet RowReader handle IO
> > >
> > > [1] https://github.com/apache/arrow/issues/36765
> > > [2] https://github.com/apache/arrow/issues/37139
> > > [3] https://github.com/apache/arrow/issues/36587
> > > [4] https://github.com/apache/arrow/issues/37136
> > >
> > > Li Jin  于2023年9月6日周三 23:56写道:
> > >
> > > > Hello,
> > > >
> > > > I have been testing "What is the max rss needed to scan through ~100G
> > of
> > > > data in a parquet stored in gcs using Arrow C++".
> > > >
> > > > The current answer is about ~6G of memory which seems a bit high so I
> > > > looked into it. What I observed during the process led me to think
> that
> > > > there are some potential cache/memory issues in the dataset/parquet
> cpp
> > > > code.
> > > >
> > > > Main observation:
> > > > (1) As I am scanning through the dataset, I printed out (a) memory
> > > > allocated by the memory pool from ScanOptions (b) process rss. I
> found
> > > that
> > > > while (a) stays pretty stable throughout the scan (stays < 1G), (b)
> > keeps
> > > > increasing during the scan (looks linear to the number of files
> > scanned).
> > > > (2) I tested ScanNode in Arrow as well as an in-house library that
> > > > implements its own "S3Dataset" similar to Arrow dataset, both showing
> > > > similar rss usage. (Which led me to think the issue is more likely to
> > be
> > > in
> > > > the parquet cpp code instead of dataset code).
> > > > (3) Scan the same dataset twice in the same process doesn't increase
> > the
> > > > max rss.
> > > >
> > > > I plan to look into the parquet cpp/dataset code but I wonder if
> > someone
> > > > has some clues what the issue might be or where to look at?
> > > >
> > > > Thanks,
> > > > Li
> > > >
> > >
> >
>


Re: [C++] Potential cache/memory leak when reading parquet

2023-09-06 Thread Li Jin
Thanks both for the quick response! I wonder if there is some code in
parquet cpp  that might be keeping some cached information (perhaps
metadata) per file scanned?

On Wed, Sep 6, 2023 at 12:10 PM wish maple  wrote:

> I've met lots of Parquet Dataset issues. The main problem is that currently
> we have 2 sets or API
> and they have different scan-options. And sometimes different interfaces
> like `to_batches()` or
> others would enable different scan options.
>
> I think [2] is similar to your problem. 1-4 are some issues I met before.
>
> As for the code, you may take a look at :
> 1. ParquetFileFormat and Dataset related.
> 2. FileSystem and CacheRange. Parquet might use this to handle pre-buffer
> 3. How Parquet RowReader handle IO
>
> [1] https://github.com/apache/arrow/issues/36765
> [2] https://github.com/apache/arrow/issues/37139
> [3] https://github.com/apache/arrow/issues/36587
> [4] https://github.com/apache/arrow/issues/37136
>
> Li Jin  于2023年9月6日周三 23:56写道:
>
> > Hello,
> >
> > I have been testing "What is the max rss needed to scan through ~100G of
> > data in a parquet stored in gcs using Arrow C++".
> >
> > The current answer is about ~6G of memory which seems a bit high so I
> > looked into it. What I observed during the process led me to think that
> > there are some potential cache/memory issues in the dataset/parquet cpp
> > code.
> >
> > Main observation:
> > (1) As I am scanning through the dataset, I printed out (a) memory
> > allocated by the memory pool from ScanOptions (b) process rss. I found
> that
> > while (a) stays pretty stable throughout the scan (stays < 1G), (b) keeps
> > increasing during the scan (looks linear to the number of files scanned).
> > (2) I tested ScanNode in Arrow as well as an in-house library that
> > implements its own "S3Dataset" similar to Arrow dataset, both showing
> > similar rss usage. (Which led me to think the issue is more likely to be
> in
> > the parquet cpp code instead of dataset code).
> > (3) Scan the same dataset twice in the same process doesn't increase the
> > max rss.
> >
> > I plan to look into the parquet cpp/dataset code but I wonder if someone
> > has some clues what the issue might be or where to look at?
> >
> > Thanks,
> > Li
> >
>


[C++] Potential cache/memory leak when reading parquet

2023-09-06 Thread Li Jin
Hello,

I have been testing "What is the max rss needed to scan through ~100G of
data in a parquet stored in gcs using Arrow C++".

The current answer is about ~6G of memory which seems a bit high so I
looked into it. What I observed during the process led me to think that
there are some potential cache/memory issues in the dataset/parquet cpp
code.

Main observation:
(1) As I am scanning through the dataset, I printed out (a) memory
allocated by the memory pool from ScanOptions (b) process rss. I found that
while (a) stays pretty stable throughout the scan (stays < 1G), (b) keeps
increasing during the scan (looks linear to the number of files scanned).
(2) I tested ScanNode in Arrow as well as an in-house library that
implements its own "S3Dataset" similar to Arrow dataset, both showing
similar rss usage. (Which led me to think the issue is more likely to be in
the parquet cpp code instead of dataset code).
(3) Scan the same dataset twice in the same process doesn't increase the
max rss.

I plan to look into the parquet cpp/dataset code but I wonder if someone
has some clues what the issue might be or where to look at?

Thanks,
Li


Re: Optimized way of converting list of pa.Array to pd.DataFrame with index

2023-08-31 Thread Li Jin
Although - I am curious if there are any downsides using `self_destruct`?

On Thu, Aug 31, 2023 at 1:05 PM Li Jin  wrote:

> Ah I see - thanks for the explanation. self_destruct probably won't
> benefit in my case then. (The pa.Array here is a slice from another batch
> so there will be other references to the data backing this array)
>
> On Thu, Aug 31, 2023 at 11:24 AM David Li  wrote:
>
>> Not sure about the conversion, but regarding self_destruct: the problem
>> is that it only provides memory savings in limited situations that are hard
>> to figure out from the outside. When enabled, PyArrow will always discard
>> the reference to the array after conversion, and if there are no other
>> references, that would free the array. But different arrays may be backed
>> by the same underlying memory buffer (this is generally true for IPC and
>> Flight, for example), so freeing the array won't actually free any memory
>> since the buffer is still alive. It would only save memory if you ensure
>> each array is actually backed by its own memory allocations (which right
>> would generally mean copying data up front!).
>>
>> On Thu, Aug 31, 2023, at 11:11, Li Jin wrote:
>> > Hi,
>> >
>> > I am working on some code where I have a list of pa.Arrays and I am
>> > creating a pandas.DataFrame from it. I also want to set the index of the
>> > pd.DataFrame to be the first Array in the list.
>> >
>> > Currently I am doing sth like:
>> > "
>> > df = pa.Table.from_arrays(arrs, names=input_names).to_pandas()
>> > df.set_index(input_names[0], inplace=True)
>> > "
>> >
>> > I am curious if this is the best I can do? Also I wonder if it is still
>> > worthwhile to use the "self_destruct=True" option here (I noticed it has
>> > been EXPERIMENTAL for a long time)
>> >
>> > Thanks!
>> > Li
>>
>


Re: Optimized way of converting list of pa.Array to pd.DataFrame with index

2023-08-31 Thread Li Jin
Ah I see - thanks for the explanation. self_destruct probably won't benefit
in my case then. (The pa.Array here is a slice from another batch so there
will be other references to the data backing this array)

On Thu, Aug 31, 2023 at 11:24 AM David Li  wrote:

> Not sure about the conversion, but regarding self_destruct: the problem is
> that it only provides memory savings in limited situations that are hard to
> figure out from the outside. When enabled, PyArrow will always discard the
> reference to the array after conversion, and if there are no other
> references, that would free the array. But different arrays may be backed
> by the same underlying memory buffer (this is generally true for IPC and
> Flight, for example), so freeing the array won't actually free any memory
> since the buffer is still alive. It would only save memory if you ensure
> each array is actually backed by its own memory allocations (which right
> would generally mean copying data up front!).
>
> On Thu, Aug 31, 2023, at 11:11, Li Jin wrote:
> > Hi,
> >
> > I am working on some code where I have a list of pa.Arrays and I am
> > creating a pandas.DataFrame from it. I also want to set the index of the
> > pd.DataFrame to be the first Array in the list.
> >
> > Currently I am doing sth like:
> > "
> > df = pa.Table.from_arrays(arrs, names=input_names).to_pandas()
> > df.set_index(input_names[0], inplace=True)
> > "
> >
> > I am curious if this is the best I can do? Also I wonder if it is still
> > worthwhile to use the "self_destruct=True" option here (I noticed it has
> > been EXPERIMENTAL for a long time)
> >
> > Thanks!
> > Li
>


Optimized way of converting list of pa.Array to pd.DataFrame with index

2023-08-31 Thread Li Jin
Hi,

I am working on some code where I have a list of pa.Arrays and I am
creating a pandas.DataFrame from it. I also want to set the index of the
pd.DataFrame to be the first Array in the list.

Currently I am doing sth like:
"
df = pa.Table.from_arrays(arrs, names=input_names).to_pandas()
df.set_index(input_names[0], inplace=True)
"

I am curious if this is the best I can do? Also I wonder if it is still
worthwhile to use the "self_destruct=True" option here (I noticed it has
been EXPERIMENTAL for a long time)

Thanks!
Li


Re: Sort a Table In C++?

2023-08-17 Thread Li Jin
Aha thanks both! I ended up using the Acero example to do it.

On Thu, Aug 17, 2023 at 5:45 PM Antoine Pitrou  wrote:

>
> Or you can simply call the "sort_indices" compute function:
> https://arrow.apache.org/docs/cpp/compute.html#sorts-and-partitions
>
>
> Le 17/08/2023 à 23:20, Ian Cook a écrit :
> > Li,
> >
> > Here's a standalone C++ example that constructs a Table and executes
> > an Acero ExecPlan to sort it:
> > https://gist.github.com/ianmcook/2aa9aa82e61c3ea4405450b93cf80fbc
> >
> > Ian
> >
> > On Thu, Aug 17, 2023 at 4:50 PM Li Jin  wrote:
> >>
> >> Hi,
> >>
> >> I am writing some C++ test and found myself in need for an c++ function
> to
> >> sort an arrow Table. Before I go around implementing one myself, I
> wonder
> >> if there is already a function that does that? (I searched the doc but
> >> didn’t find one).
> >>
> >> There is function in Acero can do it but I didn’t find a super easy way
> to
> >> wrap a Table as An Acero source node either.
> >>
> >> Appreciate it if someone can give some pointers.
> >>
> >> Thanks,
> >> Li
>


Sort a Table In C++?

2023-08-17 Thread Li Jin
Hi,

I am writing some C++ test and found myself in need for an c++ function to
sort an arrow Table. Before I go around implementing one myself, I wonder
if there is already a function that does that? (I searched the doc but
didn’t find one).

There is function in Acero can do it but I didn’t find a super easy way to
wrap a Table as An Acero source node either.

Appreciate it if someone can give some pointers.

Thanks,
Li


Re: Acero and Substrait: How to select struct field from a struct column?

2023-08-08 Thread Li Jin
Got it - thanks! This is very helpful. We managed to generate the form
Weston suggested above (from a Python producer) and managed to get this to
work.

On Mon, Aug 7, 2023 at 2:45 PM Weston Pace  wrote:

> > But I can't figure out how to express "select struct field 0 from field 2
> > of the original table where field 2 is a struct column"
> >
> > Any idea how the substrait message should look like for the above?
>
> I believe it would be:
>
> ```
> "expression": {
>   "selection": {
> "direct_reference": {
>   "struct_field" {
> "field": 2,
> "child" {
>   "struct_field" {  "field": 0 }
> }
>   }
> }
> "root_reference": { }
>   }
> }
> ```
>
> To get the above I used the following python (requires [1] which could use
> a review and you need some way to convert the binary substrait to json, I
> used a script I have lying around):
>
> ```
> >>> import pyarrow as pa
> >>> import pyarrow.compute as pc
> >>> schema = pa.schema([pa.field("points", pa.struct([pa.field("x",
> pa.float64()), pa.field("y", pa.float64())]))])
> >>> expr = pc.field(("points", "x"))
> >>> expr.to_substrait(schema)
>  is_mutable=False>
> ```
>
> [1] https://github.com/apache/arrow/pull/34834
>
> On Tue, Aug 1, 2023 at 1:45 PM Li Jin  wrote:
>
> > Hi,
> >
> > I am recently trying to do
> > (1) assign a struct type column s
> > (2) flatten the struct columns (by assign v1=s[v1], v2=s[v2] and drop
> the s
> > column)
> >
> > via Substrait and Acero.
> >
> > However, I ran into the problem where I don't know the proper substrait
> > message to encode this (for (2))
> >
> > Normally, if I select a column from the origin table, it would look like
> > this (e.g, select column index 1 from the original table):
> >
> > selection {
> >   direct_reference {
> > struct_field {
> > 1
> > }
> >   }
> > }
> >
> > But I can't figure out how to express "select struct field 0 from field 2
> > of the original table where field 2 is a struct column"
> >
> > Any idea how the substrait message should look like for the above?
> >
>


Acero and Substrait: How to select struct field from a struct column?

2023-08-01 Thread Li Jin
Hi,

I am recently trying to do
(1) assign a struct type column s
(2) flatten the struct columns (by assign v1=s[v1], v2=s[v2] and drop the s
column)

via Substrait and Acero.

However, I ran into the problem where I don't know the proper substrait
message to encode this (for (2))

Normally, if I select a column from the origin table, it would look like
this (e.g, select column index 1 from the original table):

selection {
  direct_reference {
struct_field {
1
}
  }
}

But I can't figure out how to express "select struct field 0 from field 2
of the original table where field 2 is a struct column"

Any idea how the substrait message should look like for the above?


Re: scheduler() and aync_scheduler() on QueryContext

2023-07-26 Thread Li Jin
Thanks Weston! Very helpful explanation.

On Tue, Jul 25, 2023 at 6:41 PM Weston Pace  wrote:

> 1) As a rule of thumb I would probably prefer `async_scheduler`.  It's more
> feature rich and simpler to use and is meant to handle "long running" tasks
> (e.g. 10s-100s of ms or more).
>
> The scheduler is a bit more complex and is intended for very fine-grained
> scheduling.  It's currently only used in a few nodes, I think the hash-join
> and the hash-group-by for things like building the hash table (after the
> build data has been accumulated).
>
> 2) Neither scheduler manages threads.  Both of them rely on the executor in
> ExecContext::executor().  The scheduler takes a "schedule task callback"
> which it expects to do the actual executor submission.  The async scheduler
> uses futures and virtual classes.  A "task" is something that can be called
> which returns a future that will be completed when the task is complete.
> Most of the time this is done by submitting something to an executor (in
> return for a future).  Sometimes this is done indirectly, for example, by
> making an async I/O call (which under the hood is usually implemented by
> submitting something to the I/O executor).
>
> On Tue, Jul 25, 2023 at 2:56 PM Li Jin  wrote:
>
> > Hi,
> >
> > I am reading Acero and got confused about the use of
> > QueryContext::scheduler() and QueryContext::async_scheduler(). So I have
> a
> > couple of questions:
> >
> > (1) What are the different purposes of these two?
> > (2) Does scheduler/aysnc_scheduler own any threads inside their
> respective
> > classes or do they use the thread pool from ExecContext::executor()?
> >
> > Thanks,
> > Li
> >
>


scheduler() and aync_scheduler() on QueryContext

2023-07-25 Thread Li Jin
Hi,

I am reading Acero and got confused about the use of
QueryContext::scheduler() and QueryContext::async_scheduler(). So I have a
couple of questions:

(1) What are the different purposes of these two?
(2) Does scheduler/aysnc_scheduler own any threads inside their respective
classes or do they use the thread pool from ExecContext::executor()?

Thanks,
Li


Re: C++: State of parquet 2.x / nanosecond support

2023-07-15 Thread Li Jin
Thanks!

On Fri, Jul 14, 2023 at 10:16 AM wish maple  wrote:

> Hi, Li
> Parquet 2.6 has been supported for a long time, and recently, in Parquet
> C++
> and Python, Parquet 2.6 has been set to the default version of Parquet
> writer [1] [2].
> So I think you can just use it! However, I don't know whether nanoarrow
> supports it.
>
> Best,
> Xuwei Fu
>
> [1] https://lists.apache.org/thread/027g366yr3m03hwtpst6sr58b3trwhsm
> [2] https://github.com/apache/arrow/pull/36137
>
> On 2023/07/14 13:25:22 Li Jin wrote:
> > Hi,
> >
> > Recently I found myself in the need of nanosecond granularity timestamp.
> > IIUC this is something supported in the newer version of parquet (2.6
> > perhaps)? I wonder what is the state of that in Arrow and parquet cpp?
> >
> > Thanks,
> > Li
> >
>


C++: State of parquet 2.x / nanosecond support

2023-07-14 Thread Li Jin
Hi,

Recently I found myself in the need of nanosecond granularity timestamp.
IIUC this is something supported in the newer version of parquet (2.6
perhaps)? I wonder what is the state of that in Arrow and parquet cpp?

Thanks,
Li


Re: Confusion on substrait AggregateRel::groupings and Arrow consumer

2023-07-10 Thread Li Jin
Thanks Weston. This is probably an issue with ibis producer then:
https://github.com/ibis-project/ibis-substrait/blob/6db09a721281e1fc325cf6d23c0a1a20fe37af73/ibis_substrait/compiler/translate.py#L1100

I will change that code and see if I can get it to work.

On Mon, Jul 10, 2023 at 5:47 PM Weston Pace  wrote:

> Yes, that is correct.
>
> What Substrait calls "groupings" is what is often referred to in SQL as
> "grouping sets".  These allow you to compute the same aggregates but group
> by different criteria.  Two very common ways of creating grouping sets are
> "group by cube" and "group by rollup".  Snowflake's documentation for
> rollup[1] describes the motivation quite well:
>
> > You can think of rollup as generating multiple result sets, each
> > of which (after the first) is the aggregate of the previous result
> > set. So, for example, if you own a chain of retail stores, you
> > might want to see the profit for:
> >  * Each store.
> >  * Each city (large cities might have multiple stores).
> >  * Each state.
> >  * Everything (all stores in all states).
>
> Acero does not currently handle more than one grouping set.
>
>
> [1] https://docs.snowflake.com/en/sql-reference/constructs/group-by-rollup
>
> On Mon, Jul 10, 2023 at 2:22 PM Li Jin  wrote:
>
> > Hi,
> >
> > I am looking at the substrait protobuf for AggregateRel as well the Acero
> > substrait consumer code:
> >
> >
> >
> https://github.com/apache/arrow/blob/main/cpp/src/arrow/engine/substrait/relation_internal.cc#L851
> >
> >
> https://github.com/substrait-io/substrait/blob/main/proto/substrait/algebra.proto#L209
> >
> > Looks like in subtrait, AggregateRel can have multiple groupings and each
> > grouping can have multiple expressions. Let's say now I want to "compute
> > sum and mean on column A group by column B, C, D" (for Acero to execute).
> > Is the right way to create one grouping with 3 expressions (direct
> > reference) for "column B, C, D"?
> >
> > Thanks,
> > Li
> >
>


Confusion on substrait AggregateRel::groupings and Arrow consumer

2023-07-10 Thread Li Jin
Hi,

I am looking at the substrait protobuf for AggregateRel as well the Acero
substrait consumer code:

https://github.com/apache/arrow/blob/main/cpp/src/arrow/engine/substrait/relation_internal.cc#L851
https://github.com/substrait-io/substrait/blob/main/proto/substrait/algebra.proto#L209

Looks like in subtrait, AggregateRel can have multiple groupings and each
grouping can have multiple expressions. Let's say now I want to "compute
sum and mean on column A group by column B, C, D" (for Acero to execute).
Is the right way to create one grouping with 3 expressions (direct
reference) for "column B, C, D"?

Thanks,
Li


Re: [C++] Dealing with third party method that raises exception

2023-06-29 Thread Li Jin
Thanks Antoine - the examples are useful - I can use the same pattern for
now. Thanks for the quick response!

On Thu, Jun 29, 2023 at 10:47 AM Antoine Pitrou  wrote:

>
> Hi Li,
>
> There is not currently, but it would probably be a useful small utility.
> If you look for `std::exception` in the codebase, you'll find that there
> a couple of places where we turn it into a Status already.
>
> Regards
>
> Antoine.
>
>
> Le 29/06/2023 à 16:20, Li Jin a écrit :
> > Hi,
> >
> > IIUC, most of the Arrow C++ code doesn't not use exceptions. My question
> is
> > are there some Arrow utility / macro that wrap the function/code that
> might
> > raise an exception and turn that into code that returns an arrow error
> > Status?
> >
> > Thanks!
> > Li
> >
>


[C++] Dealing with third party method that raises exception

2023-06-29 Thread Li Jin
Hi,

IIUC, most of the Arrow C++ code doesn't not use exceptions. My question is
are there some Arrow utility / macro that wrap the function/code that might
raise an exception and turn that into code that returns an arrow error
Status?

Thanks!
Li


Re: Turn a vector of Scalar to an Array/ArrayData of the same datatype

2023-06-16 Thread Li Jin
Thanks Jin! That's perfect.

On Thu, Jun 15, 2023 at 11:21 PM Jin Shang  wrote:

> Hi Li,
>
> I've faced this issue before, and I ended up using a generic ArrayBuilder,
> for example:
>
> ```cpp
> auto type = int32();
> std::vector> scalars = {MakeScalar(1),
> MakeScalar(2)};
>
> ARROW_ASSIGN_OR_RAISE(std::unique_ptr builder,
> MakeBuilder(type));
> ARROW_RETURN_NOT_OK(builder->AppendScalars(scalars));
> ARROW_ASSIGN_OR_RAISE(auto arr, builder->Finish());
> ```
>
> Best,
> Jin
>
>
> On Fri, Jun 16, 2023 at 5:23 AM Li Jin  wrote:
>
> > Hi,
> >
> > I find myself in need of a function to turn a vector of Scalar to an
> Array
> > of the same datatype. The data type is known at the runtime. e.g.
> >
> > shared_ptr concat_scalars(vector values.
> > shared_ptr type);
> >
> > I wonder if I need to use sth like Scalar::Accept(ScalarVisitor*) or is
> > there an easier/better way to achieve this?
> >
> > For context, I am trying to implement UDF support for hash aggregation,
> > each UDF invocation will give me back one Scalar (for each group), and I
> > need to concat them in the HashAggregateKernel finalize method. So
> > performance is not a large concern here, time in the UDF would likely
> > dominate the total runtime.
> >
> > Thanks!
> > Li
> >
>


Turn a vector of Scalar to an Array/ArrayData of the same datatype

2023-06-15 Thread Li Jin
Hi,

I find myself in need of a function to turn a vector of Scalar to an Array
of the same datatype. The data type is known at the runtime. e.g.

shared_ptr concat_scalars(vector values.
shared_ptr type);

I wonder if I need to use sth like Scalar::Accept(ScalarVisitor*) or is
there an easier/better way to achieve this?

For context, I am trying to implement UDF support for hash aggregation,
each UDF invocation will give me back one Scalar (for each group), and I
need to concat them in the HashAggregateKernel finalize method. So
performance is not a large concern here, time in the UDF would likely
dominate the total runtime.

Thanks!
Li


Re: Group rows in a stream of record batches by group id?

2023-06-13 Thread Li Jin
(Admittedly, PR title of [1] doesn't reflect that only the scalar aggregate
UDF is implemented and not the hash one - that is an oversight on my part -
sorry)

On Tue, Jun 13, 2023 at 3:51 PM Li Jin  wrote:

> Thanks Weston.
>
> I think I found what you pointed out to me before which is this bit of
> code:
>
> https://github.com/apache/arrow/blob/main/cpp/src/arrow/dataset/partition.cc#L118
> I will try if I can adapt this to be used in streaming situation.
>
> > I know you recently added [1] and I'm maybe a little uncertain what
> > the difference is between this ask and the capabilities added in [1].
>
> In [1], I implemented scalar aggregate UDF, which just concat all the
> input batches and applies the UDF. Now I am trying to implement the
> grouped/hash aggregate version. The idea what is the group by node will
> call:
>
> https://github.com/apache/arrow/blob/main/cpp/src/arrow/acero/aggregate_node.cc#L800
>
> So I am looking into implementing a hash aggregate kernel backed by a
> Python UDF, which in turn, need to implement the logic that I asked above.
>
> On Tue, Jun 13, 2023 at 3:11 PM Weston Pace  wrote:
>
>> Are you looking for something in C++ or python?  We have a thing called
>> the
>> "grouper" (arrow::compute::Grouper in arrow/compute/row/grouper.h) which
>> (if memory serves) is the heart of the functionality in C++.  It would be
>> nice to add some python bindings for this functionality as this ask comes
>> up from pyarrow users pretty regularly.
>>
>> The grouper is used in src/arrow/dataset/partition.h to partition a record
>> batch into groups of batches.  This is how the dataset writer writes a
>> partitioned dataset.  It's a good example of how you would use the grouper
>> for a "one batch in, one batch per group out" use case.
>>
>> The grouper can also be used in a streaming situation (many batches in,
>> one
>> batch per group out).  In fact, the grouper is what is used by the group
>> by
>> node.  I know you recently added [1] and I'm maybe a little uncertain what
>> the difference is between this ask and the capabilities added in [1].
>>
>> [1] https://github.com/apache/arrow/pull/35514
>>
>> On Tue, Jun 13, 2023 at 8:23 AM Li Jin  wrote:
>>
>> > Hi,
>> >
>> > I am trying to write a function that takes a stream of record batches
>> > (where the last column is group id), and produces k record batches,
>> where
>> > record batches k_i contain all the rows with group id == i.
>> >
>> > Pseudocode is sth like:
>> >
>> > def group_rows(batches, k) -> array[RecordBatch] {
>> >   builders = array[RecordBatchBuilder](k)
>> >   for batch in batches:
>> ># Assuming last column is the group id
>> >group_ids = batch.column(-1)
>> >for i in batch.num_rows():
>> > k_i = group_ids[i]
>> > builders[k_i].append(batch[i])
>> >
>> >batches = array[RecordBatch](k)
>> >for i in range(k):
>> >batches[i] = builders[i].build()
>> >return batches
>> > }
>> >
>> > I wonder if there is some existing code that does something like this?
>> > (Specially I didn't find code that can append row/rows to a
>> > RecordBatchBuilder (either one row given an row index, or multiple rows
>> > given a list of row indices)
>> >
>>
>


Re: Group rows in a stream of record batches by group id?

2023-06-13 Thread Li Jin
Thanks Weston.

I think I found what you pointed out to me before which is this bit of code:
https://github.com/apache/arrow/blob/main/cpp/src/arrow/dataset/partition.cc#L118
I will try if I can adapt this to be used in streaming situation.

> I know you recently added [1] and I'm maybe a little uncertain what
> the difference is between this ask and the capabilities added in [1].

In [1], I implemented scalar aggregate UDF, which just concat all the input
batches and applies the UDF. Now I am trying to implement the grouped/hash
aggregate version. The idea what is the group by node will call:
https://github.com/apache/arrow/blob/main/cpp/src/arrow/acero/aggregate_node.cc#L800

So I am looking into implementing a hash aggregate kernel backed by a
Python UDF, which in turn, need to implement the logic that I asked above.

On Tue, Jun 13, 2023 at 3:11 PM Weston Pace  wrote:

> Are you looking for something in C++ or python?  We have a thing called the
> "grouper" (arrow::compute::Grouper in arrow/compute/row/grouper.h) which
> (if memory serves) is the heart of the functionality in C++.  It would be
> nice to add some python bindings for this functionality as this ask comes
> up from pyarrow users pretty regularly.
>
> The grouper is used in src/arrow/dataset/partition.h to partition a record
> batch into groups of batches.  This is how the dataset writer writes a
> partitioned dataset.  It's a good example of how you would use the grouper
> for a "one batch in, one batch per group out" use case.
>
> The grouper can also be used in a streaming situation (many batches in, one
> batch per group out).  In fact, the grouper is what is used by the group by
> node.  I know you recently added [1] and I'm maybe a little uncertain what
> the difference is between this ask and the capabilities added in [1].
>
> [1] https://github.com/apache/arrow/pull/35514
>
> On Tue, Jun 13, 2023 at 8:23 AM Li Jin  wrote:
>
> > Hi,
> >
> > I am trying to write a function that takes a stream of record batches
> > (where the last column is group id), and produces k record batches, where
> > record batches k_i contain all the rows with group id == i.
> >
> > Pseudocode is sth like:
> >
> > def group_rows(batches, k) -> array[RecordBatch] {
> >   builders = array[RecordBatchBuilder](k)
> >   for batch in batches:
> ># Assuming last column is the group id
> >group_ids = batch.column(-1)
> >for i in batch.num_rows():
> > k_i = group_ids[i]
> > builders[k_i].append(batch[i])
> >
> >batches = array[RecordBatch](k)
> >for i in range(k):
> >batches[i] = builders[i].build()
> >return batches
> > }
> >
> > I wonder if there is some existing code that does something like this?
> > (Specially I didn't find code that can append row/rows to a
> > RecordBatchBuilder (either one row given an row index, or multiple rows
> > given a list of row indices)
> >
>


Group rows in a stream of record batches by group id?

2023-06-13 Thread Li Jin
Hi,

I am trying to write a function that takes a stream of record batches
(where the last column is group id), and produces k record batches, where
record batches k_i contain all the rows with group id == i.

Pseudocode is sth like:

def group_rows(batches, k) -> array[RecordBatch] {
  builders = array[RecordBatchBuilder](k)
  for batch in batches:
   # Assuming last column is the group id
   group_ids = batch.column(-1)
   for i in batch.num_rows():
k_i = group_ids[i]
builders[k_i].append(batch[i])

   batches = array[RecordBatch](k)
   for i in range(k):
   batches[i] = builders[i].build()
   return batches
}

I wonder if there is some existing code that does something like this?
(Specially I didn't find code that can append row/rows to a
RecordBatchBuilder (either one row given an row index, or multiple rows
given a list of row indices)


Re: Converting Pandas DataFrame <-> Struct Array?

2023-06-13 Thread Li Jin
Gotcha - If there is no penalty from RecordBatch<->StructArray then I am
happy with the current approach - thanks!

For Spencer's question, the reason that I use StructArray is because the
kernel interfaces I am interested in uses Array interface instead of
RecordBatch, so StructArray is easier than RecordBatch to interact with
kernels.

On Tue, Jun 13, 2023 at 4:15 AM Joris Van den Bossche <
jorisvandenboss...@gmail.com> wrote:

> I think your original code roundtripping through RecordBatch
> (`pa.RecordBatch.from_pandas(df).to_struct_array()`) is the best
> option at the moment. The RecordBatch<->StructArray part is a cheap
> (zero-copy) conversion, and by using RecordBatch.from_pandas, you can
> rely on all pandas<->arrow conversion logic that is implemented in
> pyarrow (and which keeps the data columnar, in contrast to
> `df.itertuples()` which converts the data into rows of python objects
> as intermediate).
>
> Given that the conversion through RecordBatch works nicely, I am not
> sure it is worth it to add new APIs to directly convert between
> StructArray and pandas DataFrames.
>
> Joris
>
> On Mon, 12 Jun 2023 at 20:32, Spencer Nelson  wrote:
> >
> > Here's a one-liner that does it, but I expect it's moderately slower than
> > the RecordBatch version:
> >
> > pa.array(df.itertuples(index=False), type=pa.struct([pa.field(col,
> > pa.from_numpy_dtype(df.dtypes[col])) for col in df.columns]))
> >
> > Most of the complexity is in the 'type'. It's less scary than it looks,
> and
> > if you can afford multiple lines I think it's almost readable:
> >
> > fields = [pa.field(col, pa.from_numpy_dtype(df.dtypes[col])) for col in
> > df.columns]
> > pa_type = pa.struct(fields)
> > pa.array(df.itertuples(index=False, type=pa_type)
> >
> > But this seems like a classic XY problem. What is the root issue you're
> > trying to solve? Why avoid RecordBatch?
> >
> > On Mon, Jun 12, 2023 at 11:14 AM Li Jin  wrote:
> >
> > > !---|
> > >   This Message Is From an Untrusted Sender
> > >   You have not previously corresponded with this sender.
> > >   See https://itconnect.uw.edu/email-tags for additional
> > >   information.  Please contact the UW-IT Service Center,
> > >   h...@uw.edu 206.221.5000, for assistance.
> > > |---!
> > >
> > > Gentle bump.
> > >
> > > Not a big deal if I need to use the API above to do so, but bump in
> case
> > > someone has a better way.
> > >
> > > On Fri, Jun 9, 2023 at 4:34 PM Li Jin  wrote:
> > >
> > > > Hello,
> > > >
> > > > I am looking for the best ways for converting Pandas DataFrame <->
> Struct
> > > > Array.
> > > >
> > > > Currently I have:
> > > >
> > > > pa.RecordBatch.from_pandas(df).to_struct_array()
> > > >
> > > > and
> > > >
> > > > pa.RecordBatch.from_struct_array(s_array).to_pandas()
> > > >
> > > > - I wonder if there is a direct way to go from DataFrame <-> Struct
> Array
> > > > without going through RecordBatch?
> > > >
> > > > Thanks,
> > > > Li
> > > >
> > >
>


Re: Converting Pandas DataFrame <-> Struct Array?

2023-06-12 Thread Li Jin
Gentle bump.

Not a big deal if I need to use the API above to do so, but bump in case
someone has a better way.

On Fri, Jun 9, 2023 at 4:34 PM Li Jin  wrote:

> Hello,
>
> I am looking for the best ways for converting Pandas DataFrame <-> Struct
> Array.
>
> Currently I have:
>
> pa.RecordBatch.from_pandas(df).to_struct_array()
>
> and
>
> pa.RecordBatch.from_struct_array(s_array).to_pandas()
>
> - I wonder if there is a direct way to go from DataFrame <-> Struct Array
> without going through RecordBatch?
>
> Thanks,
> Li
>


Converting Pandas DataFrame <-> Struct Array?

2023-06-09 Thread Li Jin
Hello,

I am looking for the best ways for converting Pandas DataFrame <-> Struct
Array.

Currently I have:

pa.RecordBatch.from_pandas(df).to_struct_array()

and

pa.RecordBatch.from_struct_array(s_array).to_pandas()

- I wonder if there is a direct way to go from DataFrame <-> Struct Array
without going through RecordBatch?

Thanks,
Li


Re: Github command to rerun CI checks?

2023-04-18 Thread Li Jin
Got it - Thanks for all the information, super helpful.

Li

On Tue, Apr 18, 2023 at 9:13 AM Raúl Cumplido 
wrote:

> Hi Li,
>
> As a committer I am able to login to Appveyor with my GitHub account
> and I can see all the builds for the Arrow project here:
> https://ci.appveyor.com/project/ApacheSoftwareFoundation/arrow/history
> I can go to a specific build clicking on it, example:
>
> https://ci.appveyor.com/project/ApacheSoftwareFoundation/arrow/builds/46811349
> and I can click on "RE-BUILD PR"
> I don't think I was given permission apart for doing that, so you
> should be able to give that a try.
>
> We don't have a way of running PR checks as we do with the crossbow
> command. We could investigate if there is a way to do it via API.
>
> Thanks,
> Raúl
>
> El mar, 18 abr 2023 a las 14:47, Li Jin ()
> escribió:
> >
> > Ah, thank you! This UI works nice except that it doesn't seem to work for
> > the " continuous-integration/appveyor/pr "  check
> >
> > I wonder if there is something similar to
> >
> > "@github-actions crossbow submit"
> >
> > But for the regular CI checks?
> >
> > On Mon, Apr 17, 2023 at 2:03 PM Jacob Wujciak
> 
> > wrote:
> >
> > > (you don't have to do the long way via the actions tab to reach the
> > > relevant workflow run, you can click on the 'details' linkin the PR CI
> > > Report )
> > >
> > > On Mon, Apr 17, 2023 at 8:01 PM Jacob Wujciak 
> > > wrote:
> > >
> > > > Happy to help!
> > > >
> > > > The UI was recently updated:
> > > >
> > > >
> > >
> https://docs.github.com/en/actions/managing-workflow-runs/re-running-workflows-and-jobs#re-running-failed-jobs-in-a-workflow
> > > >
> > > > On Mon, Apr 17, 2023 at 7:57 PM Li Jin 
> wrote:
> > > >
> > > >> Thanks! I am a committer and am trying to rerun CI on a PR that I am
> > > >> reviewing (since one seems to have failed with timeout).
> > > >>
> > > >> I am not familiar with GHA UI - what do I need to do here?
> > > >>
> > > >> Li
> > > >>
> > > >> On Mon, Apr 17, 2023 at 11:44 AM Jacob Wujciak
> > > >>  wrote:
> > > >>
> > > >> > Committers can re-run workflows via the GHA UI. If you want to
> avoid
> > > >> having
> > > >> > to add small changes to be able to commit you can use empty
> commits
> > > via
> > > >> > '--allow-empty'.
> > > >> >
> > > >> > On Mon, Apr 17, 2023 at 5:25 PM Li Jin 
> wrote:
> > > >> >
> > > >> > > Hi,
> > > >> > >
> > > >> > > Is there a github command to rerun CI checks? (instead of
> pushing a
> > > >> new
> > > >> > > commit?)
> > > >> > >
> > > >> > > Thanks,
> > > >> > > Li
> > > >> > >
> > > >> >
> > > >>
> > > >
> > >
>


Re: Github command to rerun CI checks?

2023-04-18 Thread Li Jin
Ah, thank you! This UI works nice except that it doesn't seem to work for
the " continuous-integration/appveyor/pr "  check

I wonder if there is something similar to

"@github-actions crossbow submit"

But for the regular CI checks?

On Mon, Apr 17, 2023 at 2:03 PM Jacob Wujciak 
wrote:

> (you don't have to do the long way via the actions tab to reach the
> relevant workflow run, you can click on the 'details' linkin the PR CI
> Report )
>
> On Mon, Apr 17, 2023 at 8:01 PM Jacob Wujciak 
> wrote:
>
> > Happy to help!
> >
> > The UI was recently updated:
> >
> >
> https://docs.github.com/en/actions/managing-workflow-runs/re-running-workflows-and-jobs#re-running-failed-jobs-in-a-workflow
> >
> > On Mon, Apr 17, 2023 at 7:57 PM Li Jin  wrote:
> >
> >> Thanks! I am a committer and am trying to rerun CI on a PR that I am
> >> reviewing (since one seems to have failed with timeout).
> >>
> >> I am not familiar with GHA UI - what do I need to do here?
> >>
> >> Li
> >>
> >> On Mon, Apr 17, 2023 at 11:44 AM Jacob Wujciak
> >>  wrote:
> >>
> >> > Committers can re-run workflows via the GHA UI. If you want to avoid
> >> having
> >> > to add small changes to be able to commit you can use empty commits
> via
> >> > '--allow-empty'.
> >> >
> >> > On Mon, Apr 17, 2023 at 5:25 PM Li Jin  wrote:
> >> >
> >> > > Hi,
> >> > >
> >> > > Is there a github command to rerun CI checks? (instead of pushing a
> >> new
> >> > > commit?)
> >> > >
> >> > > Thanks,
> >> > > Li
> >> > >
> >> >
> >>
> >
>


Re: Github command to rerun CI checks?

2023-04-17 Thread Li Jin
Thanks! I am a committer and am trying to rerun CI on a PR that I am
reviewing (since one seems to have failed with timeout).

I am not familiar with GHA UI - what do I need to do here?

Li

On Mon, Apr 17, 2023 at 11:44 AM Jacob Wujciak
 wrote:

> Committers can re-run workflows via the GHA UI. If you want to avoid having
> to add small changes to be able to commit you can use empty commits via
> '--allow-empty'.
>
> On Mon, Apr 17, 2023 at 5:25 PM Li Jin  wrote:
>
> > Hi,
> >
> > Is there a github command to rerun CI checks? (instead of pushing a new
> > commit?)
> >
> > Thanks,
> > Li
> >
>


Github command to rerun CI checks?

2023-04-17 Thread Li Jin
Hi,

Is there a github command to rerun CI checks? (instead of pushing a new
commit?)

Thanks,
Li


Re: Stacktrace from Arrow status?

2023-04-04 Thread Li Jin
Thanks David!

On Tue, Apr 4, 2023 at 4:58 PM David Li  wrote:

> Yes, that's what the ARROW_EXTRA_ERROR_CONTEXT option does.
>
> On Tue, Apr 4, 2023, at 11:13, Li Jin wrote:
> > Picking up this conversation again, I noticed when I hit an error in
> > test I
> > saw this nice stacktraces:
> > ```
> >
> /home/icexelloss/workspace/arrow/cpp/src/arrow/acero/hash_aggregate_test.cc:4681:
> > Failure
> > Failed
> > '_error_or_value146.status()' failed with NotImplemented: Consume with
> > nulls
> >
> /home/icexelloss/workspace/arrow/cpp/src/arrow/acero/aggregate_node.cc:400
> >  kernels_[i]->consume(&batch_ctx, column_batch)
> >
> /home/icexelloss/workspace/arrow/cpp/src/arrow/acero/aggregate_node.cc:419
> >  DoConsume(ExecSpan(exec_batch), thread_index)
> >
> /home/icexelloss/workspace/arrow/cpp/src/arrow/acero/aggregate_node.cc:216
> >  handle_batch(batch, segment)
> >
> /home/icexelloss/workspace/arrow/cpp/src/arrow/acero/aggregate_node.cc:429
> >  HandleSegments(segmenter_.get(), batch, segment_field_ids_, handler)
> > /home/icexelloss/workspace/arrow/cpp/src/arrow/acero/source_node.cc:119
> >  output_->InputReceived(this, std::move(batch))
> >
> /home/icexelloss/workspace/arrow/cpp/src/arrow/acero/hash_aggregate_test.cc:271
> >  start_and_collect.MoveResult()
> > ```
> >
> > Is this because of the ARROW_EXTRA_ERROR_CONTEXT option?
> >
> > On Fri, Mar 24, 2023 at 12:04 PM Li Jin  wrote:
> >
> >> Thanks David!
> >>
> >> On Tue, Mar 21, 2023 at 6:32 PM David Li  wrote:
> >>
> >>> Not really, other than:
> >>>
> >>> * By searching the codebase for relevant strings.
> >>> * If you are building Arrow from source, you can use the option
> >>> ARROW_EXTRA_ERROR_CONTEXT [1] when configuring. This will get you a
> rough
> >>> stack trace (IIRC, if a function returns the status without using one
> of
> >>> the macros, it won't add a line to the trace).
> >>>
> >>> [1]:
> >>>
> https://github.com/apache/arrow/blob/1ba4425fab35d572132cb30eee6087a7dca89853/cpp/cmake_modules/DefineOptions.cmake#L608-L609
> >>>
> >>> On Tue, Mar 21, 2023, at 18:12, Li Jin wrote:
> >>> > Hi,
> >>> >
> >>> > This might be a dumb question but when Arrow code raises an invalid
> >>> status,
> >>> > I observe that it usually pops up to the user without stack
> >>> information. I
> >>> > wonder if there are any tricks to show where the invalid status is
> >>> coming
> >>> > from?
> >>> >
> >>> > Thanks,
> >>> > Li
> >>>
> >>
>


Re: Stacktrace from Arrow status?

2023-04-04 Thread Li Jin
Picking up this conversation again, I noticed when I hit an error in test I
saw this nice stacktraces:
```
/home/icexelloss/workspace/arrow/cpp/src/arrow/acero/hash_aggregate_test.cc:4681:
Failure
Failed
'_error_or_value146.status()' failed with NotImplemented: Consume with nulls
/home/icexelloss/workspace/arrow/cpp/src/arrow/acero/aggregate_node.cc:400
 kernels_[i]->consume(&batch_ctx, column_batch)
/home/icexelloss/workspace/arrow/cpp/src/arrow/acero/aggregate_node.cc:419
 DoConsume(ExecSpan(exec_batch), thread_index)
/home/icexelloss/workspace/arrow/cpp/src/arrow/acero/aggregate_node.cc:216
 handle_batch(batch, segment)
/home/icexelloss/workspace/arrow/cpp/src/arrow/acero/aggregate_node.cc:429
 HandleSegments(segmenter_.get(), batch, segment_field_ids_, handler)
/home/icexelloss/workspace/arrow/cpp/src/arrow/acero/source_node.cc:119
 output_->InputReceived(this, std::move(batch))
/home/icexelloss/workspace/arrow/cpp/src/arrow/acero/hash_aggregate_test.cc:271
 start_and_collect.MoveResult()
```

Is this because of the ARROW_EXTRA_ERROR_CONTEXT option?

On Fri, Mar 24, 2023 at 12:04 PM Li Jin  wrote:

> Thanks David!
>
> On Tue, Mar 21, 2023 at 6:32 PM David Li  wrote:
>
>> Not really, other than:
>>
>> * By searching the codebase for relevant strings.
>> * If you are building Arrow from source, you can use the option
>> ARROW_EXTRA_ERROR_CONTEXT [1] when configuring. This will get you a rough
>> stack trace (IIRC, if a function returns the status without using one of
>> the macros, it won't add a line to the trace).
>>
>> [1]:
>> https://github.com/apache/arrow/blob/1ba4425fab35d572132cb30eee6087a7dca89853/cpp/cmake_modules/DefineOptions.cmake#L608-L609
>>
>> On Tue, Mar 21, 2023, at 18:12, Li Jin wrote:
>> > Hi,
>> >
>> > This might be a dumb question but when Arrow code raises an invalid
>> status,
>> > I observe that it usually pops up to the user without stack
>> information. I
>> > wonder if there are any tricks to show where the invalid status is
>> coming
>> > from?
>> >
>> > Thanks,
>> > Li
>>
>


Re: Zero copy cast kernels

2023-03-28 Thread Li Jin
Thanks Rok!

Original question is to asking for a way to "verify if a cast if zero copy
by read source code / documentation", and not "verify a cast if zero copy
programmatically" but I noticed by reading the test file that int64 to
micro is indeed zero copy and I expect nanos to be the same
https://github.com/apache/arrow/blob/e7d6c13d4ae3d8df0e9b668468b990f35c8a9556/cpp/src/arrow/compute/kernels/scalar_cast_test.cc#L1546

On Fri, Mar 24, 2023 at 12:36 PM Rok Mihevc  wrote:

> For scalar casting tests we use CheckCastZeroCopy [1] which you could
> reuse.
>
> [1]
>
> https://github.com/apache/arrow/blob/e7d6c13d4ae3d8df0e9b668468b990f35c8a9556/cpp/src/arrow/compute/kernels/scalar_cast_test.cc#L128-L138
>
> Rok
>


Zero copy cast kernels

2023-03-24 Thread Li Jin
Hello,

I recently found myself casting an int64 (nanos from epoch) into a nano
timestamp column with the C++ cast kernel (via Acero).

I expect this to be zero copy but I wonder if there is a way to check which
casts are zero copy and which are not?

Li


Re: Stacktrace from Arrow status?

2023-03-24 Thread Li Jin
Thanks David!

On Tue, Mar 21, 2023 at 6:32 PM David Li  wrote:

> Not really, other than:
>
> * By searching the codebase for relevant strings.
> * If you are building Arrow from source, you can use the option
> ARROW_EXTRA_ERROR_CONTEXT [1] when configuring. This will get you a rough
> stack trace (IIRC, if a function returns the status without using one of
> the macros, it won't add a line to the trace).
>
> [1]:
> https://github.com/apache/arrow/blob/1ba4425fab35d572132cb30eee6087a7dca89853/cpp/cmake_modules/DefineOptions.cmake#L608-L609
>
> On Tue, Mar 21, 2023, at 18:12, Li Jin wrote:
> > Hi,
> >
> > This might be a dumb question but when Arrow code raises an invalid
> status,
> > I observe that it usually pops up to the user without stack information.
> I
> > wonder if there are any tricks to show where the invalid status is coming
> > from?
> >
> > Thanks,
> > Li
>


Stacktrace from Arrow status?

2023-03-21 Thread Li Jin
Hi,

This might be a dumb question but when Arrow code raises an invalid status,
I observe that it usually pops up to the user without stack information. I
wonder if there are any tricks to show where the invalid status is coming
from?

Thanks,
Li


Re: [DISCUSS] Acero roadmap / philosophy

2023-03-14 Thread Li Jin
Late to the party.

Thanks Weston for sharing the thoughts around Acero. We are actually a
pretty heavy Acero user right now and are trying to take part in Acero
maintenance and development. Internally we are using Acero for a time
series streaming data processing system.

I would +1 on many of Weston's directions here, in particular to make Acero
extensionable / customizable. IMO Acero might not be the fastest "Arrow
SQL/TPC-H" engine, but the ability to customize it for ordered time series
is a huge/kill feature.

In addition to what Weston has already said, my other two cents is that I
think Acero would benefit from a separation from the Arrow core C++
library, similar to how Arrow Flight is. The main reason is that Arrow core
being such a widely used library, it benefits more from being stable and
Acero being a relatively new and standalone component, benefits more from
fast moving / quick experiment. My colleague and I are working on
https://github.com/apache/arrow/issues/15280 to make this happen.





On Fri, Mar 10, 2023 at 5:59 AM Andrew Lamb  wrote:

> I don't know much about the Acero user base, but gathering some significant
> term users (e.g. Ballista, Urban Logiq, GreptimeDB, InfluxDB IOx, etc) has
> been very helpful for DataFusion. Not only do such users bring some amount
> of maintenance capacity, but perhaps more relevantly to your discussion
> they bring a focus to the project with their usecases.
>
> With so many possible tradeoffs (e.g. streaming vs larger batch execution
> as you mention above) having people to help focus the choice of project I
> think has served DataFusion well.
>
> If Acero has such users (or potential users) perhaps reaching out to them /
> soliciting their ideas of where they want to see the project go would be a
> valuable focusing exercise.
>
> Andrew
>
> On Thu, Mar 9, 2023 at 6:35 PM Aldrin  wrote:
>
> > Thanks for sharing! There are a variety of things that I didn't know
> about
> > (such as ExecBatchBuilder) and it's interesting to hear about the
> > performance challenges.
> >
> > How much would future substrait work involve integration with Acero? I'm
> > curious how much more support of substrait is seen as valuable (should be
> > prioritized) or
> > if additional support is going to be "as-needed". Note that I have a
> > minimal understanding of how "large" substrait is and what proportion of
> it
> > is already supported by
> > Acero.
> >
> > Aldrin Montana
> > Computer Science PhD Student
> > UC Santa Cruz
> >
> >
> > On Thu, Mar 9, 2023 at 12:33 PM Antoine Pitrou 
> wrote:
> >
> > >
> > > Just a reminder for those following other implementations of Arrow,
> that
> > > Acero is the compute/execution engine subsystem baked into Arrow C++.
> > >
> > > Regards
> > >
> > > Antoine.
> > >
> > >
> > > Le 09/03/2023 à 21:20, Weston Pace a écrit :
> > > > We are getting closer to another release.  I am thinking about what
> to
> > > work
> > > > on in the next release.  I think it is a good time to have a
> discussion
> > > > about Acero in general.  This is possibly also of interest to those
> > > working
> > > > on pyarrow or r-arrow as these libraries rely on Acero for various
> > > > functionality.  Apache projects have no single owner and what follows
> > is
> > > > only my own personal opinion and plans.  Still, I will apologize in
> > > advance
> > > > for any lingering hubris or outrageous declarations of fact :)
> > > >
> > > > First, some background.  Since we started the project the landscape
> has
> > > > changed.  Most importantly, there are now more arrow-native execution
> > > > engines.  For example, datafusion, duckdb, velox, and I'm sure there
> > are
> > > > probably more.  Substrait has also been created, allowing users to
> > > > hopefully switch between different execution engines as different
> needs
> > > > arise.  Some significant contributors to Acero have taken a break or
> > > moved
> > > > onto other projects and new contributors have arrived with new
> > interests
> > > > and goals (For example, an asof join node and more focus on ordered /
> > > > streaming execution).
> > > >
> > > > I do not personally have the resources for bringing Acero's
> performance
> > > to
> > > > match that of some of the other execution engines.  I'm also not
> aware
> > of
> > > > any significant contributors attempting to do so.  I also think that
> > > having
> > > > yet another engine racing to the top of the TPC-H benchmarks is not
> the
> > > > best thing we can be doing for our users.  To be clear, our
> performance
> > > is
> > > > not "bad" but it is not "state of the art".
> > > >
> > > > ## Some significant performance challenges for Acero:
> > > >
> > > >   1. Ideally an execution engine that wants to win TPC-H should
> operate
> > > on
> > > > L2 sized batches.  To risk stating the obvious: that is not very
> large.
> > > > Typically less than 100k rows.  At that size of operation the
> > philosophy
> > > of
> > > > "we are only doing th

Re: Timestamp unit in Substrait and Arrow

2023-03-14 Thread Li Jin
Thanks Weston for the insight - for the short term we are going to try to
unify the time unit to "microseconds" to be compatible with substrait and
pay the cost of converting to nanoseconds (e.g., when passing to pandas)
when needed.

Longer term I think option (3) is probably the most practical (although,
perhaps not worthwhile if the paying performance cost at
microseconds/nanoseconds convention isn't too bad in practice)



On Thu, Mar 9, 2023 at 1:36 PM Weston Pace  wrote:

> The Substrait decision for microseconds was made because, at the time, the
> goal was to keep the type system simple and universal, and there were
> systems that didn't support ns (e.g. Iceberg, postgres, duckdb, velox).
>
> A few options (off the top of my head):
>
>  1. Attempt to get a nanoseconds timestamp type adopted in Substrait.
>
> I'm not sure how much enthusiasm there will be for this.  I think Acero is
> the only consumer that would take advantage of this.  Perhaps Ibis or
> Datafusion would have some interest.  It would require changing an old
> Substrait agreement around the rules for which data types to use.
>
> 2. Treat timestamp(ns) as a variation of timestamp(us).
>
> I'm listing this for thoroughness however I don't think we can do this.
> Substrate requires timestamps to be able to go out to the year  and a
> 64-bit nanoseconds from the epoch cannot do this.
>
> 3. Treat timestamp(ns) as a user-defined type (from Substrait's
> perspective)
>
> This is probably the easiest approach in terms of consensus-building.  The
> Substrait consumer should already have the plumbing for this in
> src/arrow/engine/substrait/extension_types.h  I think getting Acero to work
> here will be pretty easy.  The trickier part might be adapting your
> producer (Ibis?)
>
> On Thu, Mar 9, 2023 at 9:43 AM Li Jin  wrote:
>
> > Hi,
> >
> > I recently came across some limitations in expressing timestamp type with
> > Substrait in the Acero substrait consumer and am curious to hear what
> > people's thoughts are.
> >
> > The particular issue that I have is when specifying timestamp type in
> > substrait, the unit is "microseconds" and there is no way to change that.
> > When integrating with Arrow, often we have timestamps in an internal
> system
> > that is of another unit, e.g., a flight service that returns a timestamp
> in
> > nanos. Also, interop with pandas, because pandas internally use
> > nanoseconds, that is another gap.
> >
> > Currently as a result, we often need to convert from nanos <-> micro
> when a
> > substrait plan is involved to specify timestamps. It feels to me as
> > something missing in substrait but I wonder what other people think.
> >
> > (Sending this to Arrow mailing list because I know some people here are
> > pretty involved with substrait and I am more familiar with the folks in
> the
> > Arrow community. Therefore wanted to get some thoughts from the people
> > here).
> >
> > Li
> >
>


Re: [ANNOUNCE] New Arrow PMC member: Will Jones

2023-03-13 Thread Li Jin
Congratulations Will!

On Mon, Mar 13, 2023 at 3:27 PM Bryce Mecum  wrote:

> Congratulations, Will!
>


Timestamp unit in Substrait and Arrow

2023-03-09 Thread Li Jin
Hi,

I recently came across some limitations in expressing timestamp type with
Substrait in the Acero substrait consumer and am curious to hear what
people's thoughts are.

The particular issue that I have is when specifying timestamp type in
substrait, the unit is "microseconds" and there is no way to change that.
When integrating with Arrow, often we have timestamps in an internal system
that is of another unit, e.g., a flight service that returns a timestamp in
nanos. Also, interop with pandas, because pandas internally use
nanoseconds, that is another gap.

Currently as a result, we often need to convert from nanos <-> micro when a
substrait plan is involved to specify timestamps. It feels to me as
something missing in substrait but I wonder what other people think.

(Sending this to Arrow mailing list because I know some people here are
pretty involved with substrait and I am more familiar with the folks in the
Arrow community. Therefore wanted to get some thoughts from the people
here).

Li


Re: testing of back-pressure

2023-02-16 Thread Li Jin
Thanks Weston for the information.

On Thu, Feb 16, 2023 at 1:32 PM Weston Pace  wrote:

> There is a little bit at the end-to-end level.  One goal is to be able to
> repartition a very large dataset.  This means we read from something bigger
> than memory and then write to it.  This workflow is tested in
> `test_write_dataset_with_backpresure` in test_dataset.py in pyarrow.
>
> Then there is a one unit test in plan_test.cc (ExecPlanExecution,
> SinkNodeBackpressure).  And of course, there is some testing in the asof
> join test.
>
> The dataset writer and scanner have their own concepts of backpressure and
> these are independently unit tested.  However, this is more or less
> external to Acero.
>
> So I think there is certainly room for improvement here.
>
> On Thu, Feb 16, 2023 at 5:34 AM Yaron Gvili  wrote:
>
> > Hi,
> >
> > What testing of back-pressure exist in Acero? I'm mostly interested in
> > testing of back-pressure that applies to any ExecNode, but could also
> learn
> > from more specific testing. If this is not well covered, I'd look into
> > implementing such testing.
> >
> >
> > Cheers,
> > Yaron.
> >
>


Re: Question about memory usage and type casting using pyarrow Table

2023-02-15 Thread Li Jin
Oh thanks that could be a workaround! I thought pa tables are supposed to
be immutable , is there a safe way to just change the metadata?

On Wed, Feb 15, 2023 at 5:44 PM Rok Mihevc  wrote:

> Well that's suboptimal. As a workaround I suppose you could just change the
> metadata if the array is timezone aware.
>
> On Wed, Feb 15, 2023 at 10:37 PM Li Jin  wrote:
>
> > Oh found this comment:
> >
> >
> https://github.com/apache/arrow/blob/master/cpp/src/arrow/compute/kernels/scalar_cast_temporal.cc#L156
> >
> >
> >
> > On Wed, Feb 15, 2023 at 4:23 PM Li Jin  wrote:
> >
> > > Not sure if this is actually a bug or expected behavior - I filed
> > > https://github.com/apache/arrow/issues/34210
> > >
> > > On Wed, Feb 15, 2023 at 4:15 PM Li Jin  wrote:
> > >
> > >> Hmm..something feels off here - I did the following experiment on
> Arrow
> > >> 11 and casting timestamp-naive to int64 is much faster than casting
> > >> timestamp-naive to timestamp-utc:
> > >>
> > >> In [16]: %time table.cast(schema_int)
> > >> CPU times: user 114 µs, sys: 30 µs, total: 144 µs
> > >> *Wall time: 231 µs*
> > >> Out[16]:
> > >> pyarrow.Table
> > >> time: int64
> > >> 
> > >> time: [[0,1,2,3,4,...,9995,9996,9997,9998,]]
> > >>
> > >> In [17]: %time table.cast(schema_tz)
> > >> CPU times: user 119 ms, sys: 140 ms, total: 260 ms
> > >> *Wall time: 259 ms*
> > >> Out[17]:
> > >> pyarrow.Table
> > >> time: timestamp[ns, tz=UTC]
> > >> 
> > >> time: [[1970-01-01 00:00:00.0,1970-01-01
> > >> 00:00:00.1,1970-01-01 00:00:00.2,1970-01-01
> > >> 00:00:00.3,1970-01-01 00:00:00.4,...,1970-01-01
> > >> 00:00:00.09995,1970-01-01 00:00:00.09996,1970-01-01
> > >> 00:00:00.09997,1970-01-01 00:00:00.09998,1970-01-01
> > >> 00:00:00.0]]
> > >>
> > >> In [18]: table
> > >> Out[18]:
> > >> pyarrow.Table
> > >> time: timestamp[ns]
> > >> 
> > >> time: [[1970-01-01 00:00:00.0,1970-01-01
> > >> 00:00:00.1,1970-01-01 00:00:00.00002,1970-01-01
> > >> 00:00:00.3,1970-01-01 00:00:00.4,...,1970-01-01
> > >> 00:00:00.09995,1970-01-01 00:00:00.09996,1970-01-01
> > >> 00:00:00.09997,1970-01-01 00:00:00.09998,1970-01-01
> > >> 00:00:00.0]]
> > >>
> > >> On Wed, Feb 15, 2023 at 2:52 PM Rok Mihevc 
> > wrote:
> > >>
> > >>> I'm not sure about (1) but I'm pretty sure for (2) doing a cast of
> > >>> tz-aware
> > >>> timestamp to tz-naive should be a metadata-only change.
> > >>>
> > >>> On Wed, Feb 15, 2023 at 4:19 PM Li Jin 
> wrote:
> > >>>
> > >>> > Asking (2) because IIUC this is a metadata operation that could be
> > zero
> > >>> > copy but I am not sure if this is actually the case.
> > >>> >
> > >>> > On Wed, Feb 15, 2023 at 10:17 AM Li Jin 
> > wrote:
> > >>> >
> > >>> > > Hello!
> > >>> > >
> > >>> > > I have some questions about type casting memory usage with
> pyarrow
> > >>> Table.
> > >>> > > Let's say I have a pyarrow Table with 100 columns.
> > >>> > >
> > >>> > > (1) if I want to cast n columns to a different type (e.g., float
> to
> > >>> int).
> > >>> > > What is the smallest memory overhead that I can do? (memory
> > overhead
> > >>> of 1
> > >>> > > column, n columns or 100 columns?)
> > >>> > >
> > >>> > > (2) if I want to cast n timestamp columns from tz-native to
> tz-UTC.
> > >>> What
> > >>> > > is the smallest memory overhead that I can do? (0, 1 column, n
> > >>> columns or
> > >>> > > 100 columns?)
> > >>> > >
> > >>> > > Thanks!
> > >>> > > Li
> > >>> > >
> > >>> >
> > >>>
> > >>
> >
>


Re: Question about memory usage and type casting using pyarrow Table

2023-02-15 Thread Li Jin
Oh found this comment:
https://github.com/apache/arrow/blob/master/cpp/src/arrow/compute/kernels/scalar_cast_temporal.cc#L156



On Wed, Feb 15, 2023 at 4:23 PM Li Jin  wrote:

> Not sure if this is actually a bug or expected behavior - I filed
> https://github.com/apache/arrow/issues/34210
>
> On Wed, Feb 15, 2023 at 4:15 PM Li Jin  wrote:
>
>> Hmm..something feels off here - I did the following experiment on Arrow
>> 11 and casting timestamp-naive to int64 is much faster than casting
>> timestamp-naive to timestamp-utc:
>>
>> In [16]: %time table.cast(schema_int)
>> CPU times: user 114 µs, sys: 30 µs, total: 144 µs
>> *Wall time: 231 µs*
>> Out[16]:
>> pyarrow.Table
>> time: int64
>> 
>> time: [[0,1,2,3,4,...,9995,9996,9997,9998,]]
>>
>> In [17]: %time table.cast(schema_tz)
>> CPU times: user 119 ms, sys: 140 ms, total: 260 ms
>> *Wall time: 259 ms*
>> Out[17]:
>> pyarrow.Table
>> time: timestamp[ns, tz=UTC]
>> 
>> time: [[1970-01-01 00:00:00.0,1970-01-01
>> 00:00:00.1,1970-01-01 00:00:00.2,1970-01-01
>> 00:00:00.3,1970-01-01 00:00:00.4,...,1970-01-01
>> 00:00:00.09995,1970-01-01 00:00:00.09996,1970-01-01
>> 00:00:00.09997,1970-01-01 00:00:00.09998,1970-01-01
>> 00:00:00.0]]
>>
>> In [18]: table
>> Out[18]:
>> pyarrow.Table
>> time: timestamp[ns]
>> 
>> time: [[1970-01-01 00:00:00.0,1970-01-01
>> 00:00:00.1,1970-01-01 00:00:00.2,1970-01-01
>> 00:00:00.3,1970-01-01 00:00:00.4,...,1970-01-01
>> 00:00:00.09995,1970-01-01 00:00:00.09996,1970-01-01
>> 00:00:00.09997,1970-01-01 00:00:00.09998,1970-01-01
>> 00:00:00.0]]
>>
>> On Wed, Feb 15, 2023 at 2:52 PM Rok Mihevc  wrote:
>>
>>> I'm not sure about (1) but I'm pretty sure for (2) doing a cast of
>>> tz-aware
>>> timestamp to tz-naive should be a metadata-only change.
>>>
>>> On Wed, Feb 15, 2023 at 4:19 PM Li Jin  wrote:
>>>
>>> > Asking (2) because IIUC this is a metadata operation that could be zero
>>> > copy but I am not sure if this is actually the case.
>>> >
>>> > On Wed, Feb 15, 2023 at 10:17 AM Li Jin  wrote:
>>> >
>>> > > Hello!
>>> > >
>>> > > I have some questions about type casting memory usage with pyarrow
>>> Table.
>>> > > Let's say I have a pyarrow Table with 100 columns.
>>> > >
>>> > > (1) if I want to cast n columns to a different type (e.g., float to
>>> int).
>>> > > What is the smallest memory overhead that I can do? (memory overhead
>>> of 1
>>> > > column, n columns or 100 columns?)
>>> > >
>>> > > (2) if I want to cast n timestamp columns from tz-native to tz-UTC.
>>> What
>>> > > is the smallest memory overhead that I can do? (0, 1 column, n
>>> columns or
>>> > > 100 columns?)
>>> > >
>>> > > Thanks!
>>> > > Li
>>> > >
>>> >
>>>
>>


Re: Question about memory usage and type casting using pyarrow Table

2023-02-15 Thread Li Jin
Not sure if this is actually a bug or expected behavior - I filed
https://github.com/apache/arrow/issues/34210

On Wed, Feb 15, 2023 at 4:15 PM Li Jin  wrote:

> Hmm..something feels off here - I did the following experiment on Arrow 11
> and casting timestamp-naive to int64 is much faster than casting
> timestamp-naive to timestamp-utc:
>
> In [16]: %time table.cast(schema_int)
> CPU times: user 114 µs, sys: 30 µs, total: 144 µs
> *Wall time: 231 µs*
> Out[16]:
> pyarrow.Table
> time: int64
> 
> time: [[0,1,2,3,4,...,9995,9996,9997,9998,]]
>
> In [17]: %time table.cast(schema_tz)
> CPU times: user 119 ms, sys: 140 ms, total: 260 ms
> *Wall time: 259 ms*
> Out[17]:
> pyarrow.Table
> time: timestamp[ns, tz=UTC]
> 
> time: [[1970-01-01 00:00:00.0,1970-01-01
> 00:00:00.1,1970-01-01 00:00:00.2,1970-01-01
> 00:00:00.3,1970-01-01 00:00:00.4,...,1970-01-01
> 00:00:00.09995,1970-01-01 00:00:00.09996,1970-01-01
> 00:00:00.09997,1970-01-01 00:00:00.09998,1970-01-01
> 00:00:00.0]]
>
> In [18]: table
> Out[18]:
> pyarrow.Table
> time: timestamp[ns]
> 
> time: [[1970-01-01 00:00:00.0,1970-01-01
> 00:00:00.1,1970-01-01 00:00:00.2,1970-01-01
> 00:00:00.3,1970-01-01 00:00:00.4,...,1970-01-01
> 00:00:00.09995,1970-01-01 00:00:00.09996,1970-01-01
> 00:00:00.09997,1970-01-01 00:00:00.09998,1970-01-01
> 00:00:00.0]]
>
> On Wed, Feb 15, 2023 at 2:52 PM Rok Mihevc  wrote:
>
>> I'm not sure about (1) but I'm pretty sure for (2) doing a cast of
>> tz-aware
>> timestamp to tz-naive should be a metadata-only change.
>>
>> On Wed, Feb 15, 2023 at 4:19 PM Li Jin  wrote:
>>
>> > Asking (2) because IIUC this is a metadata operation that could be zero
>> > copy but I am not sure if this is actually the case.
>> >
>> > On Wed, Feb 15, 2023 at 10:17 AM Li Jin  wrote:
>> >
>> > > Hello!
>> > >
>> > > I have some questions about type casting memory usage with pyarrow
>> Table.
>> > > Let's say I have a pyarrow Table with 100 columns.
>> > >
>> > > (1) if I want to cast n columns to a different type (e.g., float to
>> int).
>> > > What is the smallest memory overhead that I can do? (memory overhead
>> of 1
>> > > column, n columns or 100 columns?)
>> > >
>> > > (2) if I want to cast n timestamp columns from tz-native to tz-UTC.
>> What
>> > > is the smallest memory overhead that I can do? (0, 1 column, n
>> columns or
>> > > 100 columns?)
>> > >
>> > > Thanks!
>> > > Li
>> > >
>> >
>>
>


Re: Question about memory usage and type casting using pyarrow Table

2023-02-15 Thread Li Jin
Hmm..something feels off here - I did the following experiment on Arrow 11
and casting timestamp-naive to int64 is much faster than casting
timestamp-naive to timestamp-utc:

In [16]: %time table.cast(schema_int)
CPU times: user 114 µs, sys: 30 µs, total: 144 µs
*Wall time: 231 µs*
Out[16]:
pyarrow.Table
time: int64

time: [[0,1,2,3,4,...,9995,9996,9997,9998,]]

In [17]: %time table.cast(schema_tz)
CPU times: user 119 ms, sys: 140 ms, total: 260 ms
*Wall time: 259 ms*
Out[17]:
pyarrow.Table
time: timestamp[ns, tz=UTC]

time: [[1970-01-01 00:00:00.0,1970-01-01
00:00:00.1,1970-01-01 00:00:00.2,1970-01-01
00:00:00.3,1970-01-01 00:00:00.4,...,1970-01-01
00:00:00.09995,1970-01-01 00:00:00.09996,1970-01-01
00:00:00.09997,1970-01-01 00:00:00.09998,1970-01-01
00:00:00.0]]

In [18]: table
Out[18]:
pyarrow.Table
time: timestamp[ns]

time: [[1970-01-01 00:00:00.0,1970-01-01
00:00:00.1,1970-01-01 00:00:00.2,1970-01-01
00:00:00.3,1970-01-01 00:00:00.4,...,1970-01-01
00:00:00.09995,1970-01-01 00:00:00.09996,1970-01-01
00:00:00.09997,1970-01-01 00:00:00.09998,1970-01-01
00:00:00.0]]

On Wed, Feb 15, 2023 at 2:52 PM Rok Mihevc  wrote:

> I'm not sure about (1) but I'm pretty sure for (2) doing a cast of tz-aware
> timestamp to tz-naive should be a metadata-only change.
>
> On Wed, Feb 15, 2023 at 4:19 PM Li Jin  wrote:
>
> > Asking (2) because IIUC this is a metadata operation that could be zero
> > copy but I am not sure if this is actually the case.
> >
> > On Wed, Feb 15, 2023 at 10:17 AM Li Jin  wrote:
> >
> > > Hello!
> > >
> > > I have some questions about type casting memory usage with pyarrow
> Table.
> > > Let's say I have a pyarrow Table with 100 columns.
> > >
> > > (1) if I want to cast n columns to a different type (e.g., float to
> int).
> > > What is the smallest memory overhead that I can do? (memory overhead
> of 1
> > > column, n columns or 100 columns?)
> > >
> > > (2) if I want to cast n timestamp columns from tz-native to tz-UTC.
> What
> > > is the smallest memory overhead that I can do? (0, 1 column, n columns
> or
> > > 100 columns?)
> > >
> > > Thanks!
> > > Li
> > >
> >
>


Re: Question about memory usage and type casting using pyarrow Table

2023-02-15 Thread Li Jin
Asking (2) because IIUC this is a metadata operation that could be zero
copy but I am not sure if this is actually the case.

On Wed, Feb 15, 2023 at 10:17 AM Li Jin  wrote:

> Hello!
>
> I have some questions about type casting memory usage with pyarrow Table.
> Let's say I have a pyarrow Table with 100 columns.
>
> (1) if I want to cast n columns to a different type (e.g., float to int).
> What is the smallest memory overhead that I can do? (memory overhead of 1
> column, n columns or 100 columns?)
>
> (2) if I want to cast n timestamp columns from tz-native to tz-UTC. What
> is the smallest memory overhead that I can do? (0, 1 column, n columns or
> 100 columns?)
>
> Thanks!
> Li
>


Question about memory usage and type casting using pyarrow Table

2023-02-15 Thread Li Jin
Hello!

I have some questions about type casting memory usage with pyarrow Table.
Let's say I have a pyarrow Table with 100 columns.

(1) if I want to cast n columns to a different type (e.g., float to int).
What is the smallest memory overhead that I can do? (memory overhead of 1
column, n columns or 100 columns?)

(2) if I want to cast n timestamp columns from tz-native to tz-UTC. What is
the smallest memory overhead that I can do? (0, 1 column, n columns or 100
columns?)

Thanks!
Li


Re: Build issues (Protobuf internal symbols)

2023-02-13 Thread Li Jin
"
In this case though, it's just that we purposely hide symbols by default.
If there's a use case, we could unhide this specific symbol (we did it for
one other Protobuf symbol) which would let you externally generate and use
the headers (as long as you take care not to actually include the generated
sources).
"
My knowledge about these symbols hiding is lacky so I would like to take
this opportunity and learn about it if possible.

How is the hide/unhide done here? My understanding is that these protobuf
symbols are included in the libarrow_fight.so, but not exposed via any
public header, and therefore it is "hidden". But if the user protoc the
header files, they can still access the symbols in libarrow_fight.so?


On Mon, Feb 13, 2023 at 3:28 PM Paul Nienaber
 wrote:

> Yes I can build normally.  Good point about this being in a public header,
> as it would consequently be in any further application translation units...
> I'll look into whether we can refactor what's been done already.
> 
> From: David Li 
> Sent: Monday, February 13, 2023 12:15 PM
> To: dev@arrow.apache.org 
> Subject: Re: Build issues (Protobuf internal symbols)
>
> Oh, sorry, this is for Arrow development...
>
> A public header should not include an internal header (this includes the
> Protobuf-generated headers). But can you build the project without
> modification already?
>
> On Mon, Feb 13, 2023, at 15:02, Paul Nienaber wrote:
> > Hi David,
> >
> > This is for Arrow feature development, where the changes have been made
> > in client.h.  Am I possibly running into symbol visibility guards I'm
> > not expecting here and didn't spot yet?
> >
> > Paul
> > 
> > From: David Li 
> > Sent: Monday, February 13, 2023 11:11 AM
> > To: dev@arrow.apache.org 
> > Subject: Re: Build issues (Protobuf internal symbols)
> >
> > The Protobuf-generated code from Arrow is not meant for external
> > consumption, and in general, the way Protobuf works will make this
> > complicated. What is the use case here?
> >
> > (In fact, that header shouldn't even be available for you to include.
> > It seems you're linking/including it from a build of Arrow, but it
> > won't be there once Arrow is actually installed.)
> >
> > In this case though, it's just that we purposely hide symbols by
> > default. If there's a use case, we could unhide this specific symbol
> > (we did it for one other Protobuf symbol) which would let you
> > externally generate and use the headers (as long as you take care not
> > to actually include the generated sources).
> >
> > On Mon, Feb 13, 2023, at 14:05, Paul Nienaber wrote:
> >> Hi folks,
> >>
> >> I'm not familiar yet with all of the details of how Protobuf is set up
> >> to be built and included or whether there's a subtle difference between
> >> the CMake targets (headers vs cc sources), but we've recently gone and
> >> started touching some of the Protobuf types from a C++ header and are
> >> seeing build issues stemming from Protobuf-internal symbols/macros not
> >> being properly in scope.
> >>
> >> Working with /cpp/src/arrow/flight/sql/client.h we've included
> >> "arrow/flight/sql/FlightSql.pb.h" (this may itself be an issue, but
> >> using  >> sql/FlightSql), and we're seeing the following:
> >>
> >> [1/4] Building CXX object
> >> src/arrow/flight/sql/CMakeFiles/arrow_flight_sql_objlib.dir/client.cc.o
> >> FAILED:
> >> src/arrow/flight/sql/CMakeFiles/arrow_flight_sql_objlib.dir/client.cc.o
> >> /opt/homebrew/bin/ccache
> >> /Library/Developer/CommandLineTools/usr/bin/c++
> >> -DARROW_EXTRA_ERROR_CONTEXT -DARROW_FLIGHT_SQL_EXPORTING
> >> -DARROW_HAVE_NEON -DARROW_WITH_RE2 -DARROW_WITH_TIMING_TESTS
> >> -DARROW_WITH_ZLIB
> >> -DGRPC_NAMESPACE_FOR_TLS_CREDENTIALS_OPTIONS=grpc::experimental
> >> -DGRPC_USE_CERTIFICATE_VERIFIER
> >> -DGRPC_USE_TLS_CHANNEL_CREDENTIALS_OPTIONS -DURI_STATIC_BUILD
> >> -I/Users/pauln/bq/git/arrow/cpp/session-managemen-build-debug/src
> >> -I/Users/pauln/bq/git/arrow/cpp/src
> >> -I/Users/pauln/bq/git/arrow/cpp/src/generated -isystem
> >> /opt/homebrew/include -isystem
> >> /Users/pauln/bq/git/arrow/cpp/thirdparty/flatbuffers/include -isystem
> >> /Users/pauln/bq/git/arrow/cpp/thirdparty/hadoop/include -isystem
> >> /opt/homebrew/opt/openssl@1.1/include -isystem
> >>
> /Users/pauln/bq/git/arrow/cpp/session-managemen-build-debug/jemalloc_ep-prefix/src
> >> -fno-aligned-new  -Qunused-arguments -fcolor-diagnostics  -Wall -Wextra
> >> -Wdocumentation -Wshorten-64-to-32 -Wno-missing-braces
> >> -Wno-unused-parameter -Wno-constant-logical-operand
> >> -Wno-return-stack-address -Wno-unknown-warning-option -Wno-pass-failed
> >> -march=armv8-a  -g -Werror -O0 -ggdb -arch arm64 -isysroot
> >> /Library/Developer/CommandLineTools/SDKs/MacOSX13.1.sdk -fPIC
> >> -std=c++17 -MD -MT
> >> src/arrow/flight/sql/CMakeFiles/arrow_flight_sql_objlib.dir/client.cc.o
> >> -MF
> >>
> src/arrow/flight/sql/CMakeFiles/arrow_flight_sql_objlib.dir/client.cc.o.d
> >>

Creating dictionary encoded string in C++

2022-11-03 Thread Li Jin
Hello,

I am working on converting some internal data sources to Arrow data. One
particularly sets of data we have contains many string columns that can be
dictionary-encoded (basically string enums)

The current internal C++ API I am using gives me an iterator of "row"
objects, for each string column, the row object exposes a method
"getStringField(index)" that return me a "string_view" and I want to
construct a dictionary-encoded Arrow string column from it.

My question is:
(1) Is there a way to do this using the Arrow C++ API?
(2) Does the internal C++ API need to return something other than a
"string_view" to support this? Internally the string column is already
dictionary-encoded (although not in Arrow format) and it might already know
the dictionary and the encoded (int) value for each string field, but it
doesn't expose it now.

Thanks,
Li


Re: [ANNOUNCE] New Arrow committer: Will Jones

2022-10-27 Thread Li Jin
congrats!

On Thu, Oct 27, 2022 at 9:03 PM Matt Topol  wrote:

> Congrats Will!
>
> On Thu, Oct 27, 2022 at 9:02 PM Ian Cook  wrote:
>
> > Congratulations Will!
> >
> > On Thu, Oct 27, 2022 at 19:56 Sutou Kouhei  wrote:
> >
> > > On behalf of the Arrow PMC, I'm happy to announce that Will Jones
> > > has accepted an invitation to become a committer on Apache
> > > Arrow. Welcome, and thank you for your contributions!
> > >
> > > kou
> > >
> >
>


[Acero] Error handling in ExecNode

2022-10-18 Thread Li Jin
Hello!

I am trying to implement an ExecNode in Acero that receives the input
batch, writes the batch to the FlightStreamWriter and then passes the batch
to the downstream node.

Looking at the API, I am thinking of doing sth like :

void InputReceived(ExecNode* input, ExecBatch batch) {
# turn batch to RecordBatch
# call flight_writer->WriteRecordBatch
# call output_.inputReceived(this, batch);
}

My question is, how do I handle the error in WriteRecordBatch properly with
ExecNode API?

Thanks,
Li


Re: Substrait consumer for custom data sources

2022-10-13 Thread Li Jin
After some struggling we finally managed to connect our internal data
source to Acero and executed a data load via pyarrow.substrait.run_query() !

We did end up temporarily modifying substrait/options.h source code locally
and made kDefaultNamedTableProvider extern/global.

But since this doesn't sound like the correct way, I am happy to do this
correctly but someone let me know the correct way :)

Li

On Thu, Oct 13, 2022 at 2:01 PM Li Jin  wrote:

> Going back to the default_exec_factory_registry idea, I think ultimately
> maybe we want registration API that looks like:
>
> """
> MetaRegistry* registry = compute::default_meta_registry();
> registry->RegisterNamedTableProvider(...);
> registry->exec_factory_registry()->AddFactory("my_custom_node",
> MakeMyCustomNode)
> ...
> """
>
> On Thu, Oct 13, 2022 at 1:32 PM Li Jin  wrote:
>
>> Weston - was trying the pyarrow approach you suggested:
>>
>> >def custom_source(endpoint):
>>   return pc.Declaration("my_custom_source", create_my_custom_options())
>>
>> (1) I didn't see "Declaration" under pyarrow.compute - which package is
>> this under?
>> (2) What Python object should I return with  create_my_custom_options()?
>> Currently I only have a C++ class for my custom option.
>>
>> On Thu, Oct 13, 2022 at 12:58 PM Li Jin  wrote:
>>
>>> > I may be assuming here but I think your problem is more that there is
>>> no way to more flexibly describe a source in python and less that you
>>> need to change the default.
>>>
>>> Correct.
>>>
>>> > For example, if you could do something like this (in pyarrow) would it
>>> work?
>>> I could try to see if that works. I feel registering the extension in
>>> C++ via one initialization seems cleaner to me because there are many other
>>> extension points that we initialize (add registering in the 
>>> default_exec_factory_registry
>>> similar to
>>> https://github.com/apache/arrow/blob/master/cpp/src/arrow/dataset/plan.cc#L30).
>>> Having some of the initialization in Pyarrow and some in Acero is a bit
>>> complicated IMO.
>>>
>>> Maybe the proper long term fix is to add something similar to
>>> compute::default_exec_factory_registry for all extension points (named
>>> table, substrait extension message) and the user can register custom stuff
>>> similar to how they can register via default_exec_factory_registry?)
>>>
>>> On Thu, Oct 13, 2022 at 12:41 PM Weston Pace 
>>> wrote:
>>>
>>>> > Does that sound like a reasonable way to do this?
>>>>
>>>> It's not ideal.
>>>>
>>>> I may be assuming here but I think your problem is more that there is
>>>> no way to more flexibly describe a source in python and less that you
>>>> need to change the default.
>>>>
>>>> For example, if you could do something like this (in pyarrow) would it
>>>> work?
>>>>
>>>> ```
>>>> def custom_source(endpoint):
>>>>   return pc.Declaration("my_custom_source", create_my_custom_options())
>>>>
>>>> def table_provider(names):
>>>>   return custom_sources[names[0]]
>>>>
>>>> pa.substrait.run_query(my_plan, table_provider=table_provider)
>>>> ```
>>>>
>>>> On Thu, Oct 13, 2022 at 8:24 AM Li Jin  wrote:
>>>> >
>>>> > We did some work around this recently and think there needs to be some
>>>> > small change to allow users to override this default provider. I will
>>>> > explain in more details:
>>>> >
>>>> > (1) Since the variable is defined as static in the substrait/options.h
>>>> > file, each translation unit will have a separate copy of the
>>>> > kDefaultNamedTableProvider
>>>> > variable. And therefore, the user cannot really change the default
>>>> that is
>>>> > used here:
>>>> >
>>>> https://github.com/apache/arrow/blob/master/python/pyarrow/_substrait.pyx#L125
>>>> >
>>>> > In order to allow user to override the kDefaultNamedTableProvider (and
>>>> > change the behavior of
>>>> >
>>>> https://github.com/apache/arrow/blob/master/python/pyarrow/_substrait.pyx#L125
>>>> > to use a custom NamedTableProvider), we need to
>>>> > (1) in substrait/options.hh, cha

Re: Substrait consumer for custom data sources

2022-10-13 Thread Li Jin
Going back to the default_exec_factory_registry idea, I think ultimately
maybe we want registration API that looks like:

"""
MetaRegistry* registry = compute::default_meta_registry();
registry->RegisterNamedTableProvider(...);
registry->exec_factory_registry()->AddFactory("my_custom_node",
MakeMyCustomNode)
...
"""

On Thu, Oct 13, 2022 at 1:32 PM Li Jin  wrote:

> Weston - was trying the pyarrow approach you suggested:
>
> >def custom_source(endpoint):
>   return pc.Declaration("my_custom_source", create_my_custom_options())
>
> (1) I didn't see "Declaration" under pyarrow.compute - which package is
> this under?
> (2) What Python object should I return with  create_my_custom_options()?
> Currently I only have a C++ class for my custom option.
>
> On Thu, Oct 13, 2022 at 12:58 PM Li Jin  wrote:
>
>> > I may be assuming here but I think your problem is more that there is
>> no way to more flexibly describe a source in python and less that you
>> need to change the default.
>>
>> Correct.
>>
>> > For example, if you could do something like this (in pyarrow) would it
>> work?
>> I could try to see if that works. I feel registering the extension in C++
>> via one initialization seems cleaner to me because there are many other
>> extension points that we initialize (add registering in the 
>> default_exec_factory_registry
>> similar to
>> https://github.com/apache/arrow/blob/master/cpp/src/arrow/dataset/plan.cc#L30).
>> Having some of the initialization in Pyarrow and some in Acero is a bit
>> complicated IMO.
>>
>> Maybe the proper long term fix is to add something similar to
>> compute::default_exec_factory_registry for all extension points (named
>> table, substrait extension message) and the user can register custom stuff
>> similar to how they can register via default_exec_factory_registry?)
>>
>> On Thu, Oct 13, 2022 at 12:41 PM Weston Pace 
>> wrote:
>>
>>> > Does that sound like a reasonable way to do this?
>>>
>>> It's not ideal.
>>>
>>> I may be assuming here but I think your problem is more that there is
>>> no way to more flexibly describe a source in python and less that you
>>> need to change the default.
>>>
>>> For example, if you could do something like this (in pyarrow) would it
>>> work?
>>>
>>> ```
>>> def custom_source(endpoint):
>>>   return pc.Declaration("my_custom_source", create_my_custom_options())
>>>
>>> def table_provider(names):
>>>   return custom_sources[names[0]]
>>>
>>> pa.substrait.run_query(my_plan, table_provider=table_provider)
>>> ```
>>>
>>> On Thu, Oct 13, 2022 at 8:24 AM Li Jin  wrote:
>>> >
>>> > We did some work around this recently and think there needs to be some
>>> > small change to allow users to override this default provider. I will
>>> > explain in more details:
>>> >
>>> > (1) Since the variable is defined as static in the substrait/options.h
>>> > file, each translation unit will have a separate copy of the
>>> > kDefaultNamedTableProvider
>>> > variable. And therefore, the user cannot really change the default
>>> that is
>>> > used here:
>>> >
>>> https://github.com/apache/arrow/blob/master/python/pyarrow/_substrait.pyx#L125
>>> >
>>> > In order to allow user to override the kDefaultNamedTableProvider (and
>>> > change the behavior of
>>> >
>>> https://github.com/apache/arrow/blob/master/python/pyarrow/_substrait.pyx#L125
>>> > to use a custom NamedTableProvider), we need to
>>> > (1) in substrait/options.hh, change the definition of
>>> > kDefaultNamedTableProvider to be an extern declaration
>>> > (2) move the definition of kDefaultNamedTableProvider to an
>>> > substrait/options.cc file
>>> >
>>> > We are still testing this but based on my limited C++ knowledge, I
>>> > think this would allow users to do
>>> > """
>>> > #include "arrow/engine/substrait/options.h"
>>> >
>>> > void initialize() {
>>> > arrow::engine::kDefaultNamedTableProvider =
>>> > some_custom_name_table_provider;
>>> > }
>>> > """
>>> > And then calling `pa.substrat.run_query" should pick up the custom name
>>> > table provider.
>>> 

Re: Substrait consumer for custom data sources

2022-10-13 Thread Li Jin
Weston - was trying the pyarrow approach you suggested:

>def custom_source(endpoint):
  return pc.Declaration("my_custom_source", create_my_custom_options())

(1) I didn't see "Declaration" under pyarrow.compute - which package is
this under?
(2) What Python object should I return with  create_my_custom_options()?
Currently I only have a C++ class for my custom option.

On Thu, Oct 13, 2022 at 12:58 PM Li Jin  wrote:

> > I may be assuming here but I think your problem is more that there is
> no way to more flexibly describe a source in python and less that you
> need to change the default.
>
> Correct.
>
> > For example, if you could do something like this (in pyarrow) would it
> work?
> I could try to see if that works. I feel registering the extension in C++
> via one initialization seems cleaner to me because there are many other
> extension points that we initialize (add registering in the 
> default_exec_factory_registry
> similar to
> https://github.com/apache/arrow/blob/master/cpp/src/arrow/dataset/plan.cc#L30).
> Having some of the initialization in Pyarrow and some in Acero is a bit
> complicated IMO.
>
> Maybe the proper long term fix is to add something similar to
> compute::default_exec_factory_registry for all extension points (named
> table, substrait extension message) and the user can register custom stuff
> similar to how they can register via default_exec_factory_registry?)
>
> On Thu, Oct 13, 2022 at 12:41 PM Weston Pace 
> wrote:
>
>> > Does that sound like a reasonable way to do this?
>>
>> It's not ideal.
>>
>> I may be assuming here but I think your problem is more that there is
>> no way to more flexibly describe a source in python and less that you
>> need to change the default.
>>
>> For example, if you could do something like this (in pyarrow) would it
>> work?
>>
>> ```
>> def custom_source(endpoint):
>>   return pc.Declaration("my_custom_source", create_my_custom_options())
>>
>> def table_provider(names):
>>   return custom_sources[names[0]]
>>
>> pa.substrait.run_query(my_plan, table_provider=table_provider)
>> ```
>>
>> On Thu, Oct 13, 2022 at 8:24 AM Li Jin  wrote:
>> >
>> > We did some work around this recently and think there needs to be some
>> > small change to allow users to override this default provider. I will
>> > explain in more details:
>> >
>> > (1) Since the variable is defined as static in the substrait/options.h
>> > file, each translation unit will have a separate copy of the
>> > kDefaultNamedTableProvider
>> > variable. And therefore, the user cannot really change the default that
>> is
>> > used here:
>> >
>> https://github.com/apache/arrow/blob/master/python/pyarrow/_substrait.pyx#L125
>> >
>> > In order to allow user to override the kDefaultNamedTableProvider (and
>> > change the behavior of
>> >
>> https://github.com/apache/arrow/blob/master/python/pyarrow/_substrait.pyx#L125
>> > to use a custom NamedTableProvider), we need to
>> > (1) in substrait/options.hh, change the definition of
>> > kDefaultNamedTableProvider to be an extern declaration
>> > (2) move the definition of kDefaultNamedTableProvider to an
>> > substrait/options.cc file
>> >
>> > We are still testing this but based on my limited C++ knowledge, I
>> > think this would allow users to do
>> > """
>> > #include "arrow/engine/substrait/options.h"
>> >
>> > void initialize() {
>> > arrow::engine::kDefaultNamedTableProvider =
>> > some_custom_name_table_provider;
>> > }
>> > """
>> > And then calling `pa.substrat.run_query" should pick up the custom name
>> > table provider.
>> >
>> > Does that sound like a reasonable way to do this?
>> >
>> >
>> >
>> >
>> > On Tue, Sep 27, 2022 at 1:59 PM Li Jin  wrote:
>> >
>> > > Thanks both. I think NamedTableProvider is close to what I want, and
>> like
>> > > Weston said, the tricky bit is how to use a custom NamedTableProvider
>> when
>> > > calling the pyarrow substrait API.
>> > >
>> > > It's a little hacky but I *think* I can override the value
>> "kDefaultNamedTableProvider"
>> > > here and pass "table_provider=None" then it "should" work:
>> > >
>> > >
>> https://github.com/apache/arrow/blob/529f653dfa5888

Re: Substrait consumer for custom data sources

2022-10-13 Thread Li Jin
> I may be assuming here but I think your problem is more that there is
no way to more flexibly describe a source in python and less that you
need to change the default.

Correct.

> For example, if you could do something like this (in pyarrow) would it
work?
I could try to see if that works. I feel registering the extension in C++
via one initialization seems cleaner to me because there are many other
extension points that we initialize (add registering in the
default_exec_factory_registry
similar to
https://github.com/apache/arrow/blob/master/cpp/src/arrow/dataset/plan.cc#L30).
Having some of the initialization in Pyarrow and some in Acero is a bit
complicated IMO.

Maybe the proper long term fix is to add something similar to
compute::default_exec_factory_registry for all extension points (named
table, substrait extension message) and the user can register custom stuff
similar to how they can register via default_exec_factory_registry?)

On Thu, Oct 13, 2022 at 12:41 PM Weston Pace  wrote:

> > Does that sound like a reasonable way to do this?
>
> It's not ideal.
>
> I may be assuming here but I think your problem is more that there is
> no way to more flexibly describe a source in python and less that you
> need to change the default.
>
> For example, if you could do something like this (in pyarrow) would it
> work?
>
> ```
> def custom_source(endpoint):
>   return pc.Declaration("my_custom_source", create_my_custom_options())
>
> def table_provider(names):
>   return custom_sources[names[0]]
>
> pa.substrait.run_query(my_plan, table_provider=table_provider)
> ```
>
> On Thu, Oct 13, 2022 at 8:24 AM Li Jin  wrote:
> >
> > We did some work around this recently and think there needs to be some
> > small change to allow users to override this default provider. I will
> > explain in more details:
> >
> > (1) Since the variable is defined as static in the substrait/options.h
> > file, each translation unit will have a separate copy of the
> > kDefaultNamedTableProvider
> > variable. And therefore, the user cannot really change the default that
> is
> > used here:
> >
> https://github.com/apache/arrow/blob/master/python/pyarrow/_substrait.pyx#L125
> >
> > In order to allow user to override the kDefaultNamedTableProvider (and
> > change the behavior of
> >
> https://github.com/apache/arrow/blob/master/python/pyarrow/_substrait.pyx#L125
> > to use a custom NamedTableProvider), we need to
> > (1) in substrait/options.hh, change the definition of
> > kDefaultNamedTableProvider to be an extern declaration
> > (2) move the definition of kDefaultNamedTableProvider to an
> > substrait/options.cc file
> >
> > We are still testing this but based on my limited C++ knowledge, I
> > think this would allow users to do
> > """
> > #include "arrow/engine/substrait/options.h"
> >
> > void initialize() {
> > arrow::engine::kDefaultNamedTableProvider =
> > some_custom_name_table_provider;
> > }
> > """
> > And then calling `pa.substrat.run_query" should pick up the custom name
> > table provider.
> >
> > Does that sound like a reasonable way to do this?
> >
> >
> >
> >
> > On Tue, Sep 27, 2022 at 1:59 PM Li Jin  wrote:
> >
> > > Thanks both. I think NamedTableProvider is close to what I want, and
> like
> > > Weston said, the tricky bit is how to use a custom NamedTableProvider
> when
> > > calling the pyarrow substrait API.
> > >
> > > It's a little hacky but I *think* I can override the value
> "kDefaultNamedTableProvider"
> > > here and pass "table_provider=None" then it "should" work:
> > >
> > >
> https://github.com/apache/arrow/blob/529f653dfa58887522af06028e5c32e8dd1a14ea/cpp/src/arrow/engine/substrait/options.h#L66
> > >
> > > I am going to give that a shot once I pull/build Arrow default into our
> > > internal build system.
> > >
> > >
> > >
> > >
> > > On Tue, Sep 27, 2022 at 10:50 AM Benjamin Kietzman <
> bengil...@gmail.com>
> > > wrote:
> > >
> > >> It seems to me that your use case could be handled by defining a
> custom
> > >> NamedTableProvider and
> > >> assigning this to ConversionOptions::named_table_provider. This was
> added
> > >> in
> > >> https://github.com/apache/arrow/pull/13613 to provide user
> configurable
> > >> dispatching for named tables;
> > >> if it doesn't address your use case then we might

Re: Substrait consumer for custom data sources

2022-10-13 Thread Li Jin
We did some work around this recently and think there needs to be some
small change to allow users to override this default provider. I will
explain in more details:

(1) Since the variable is defined as static in the substrait/options.h
file, each translation unit will have a separate copy of the
kDefaultNamedTableProvider
variable. And therefore, the user cannot really change the default that is
used here:
https://github.com/apache/arrow/blob/master/python/pyarrow/_substrait.pyx#L125

In order to allow user to override the kDefaultNamedTableProvider (and
change the behavior of
https://github.com/apache/arrow/blob/master/python/pyarrow/_substrait.pyx#L125
to use a custom NamedTableProvider), we need to
(1) in substrait/options.hh, change the definition of
kDefaultNamedTableProvider to be an extern declaration
(2) move the definition of kDefaultNamedTableProvider to an
substrait/options.cc file

We are still testing this but based on my limited C++ knowledge, I
think this would allow users to do
"""
#include "arrow/engine/substrait/options.h"

void initialize() {
arrow::engine::kDefaultNamedTableProvider =
some_custom_name_table_provider;
}
"""
And then calling `pa.substrat.run_query" should pick up the custom name
table provider.

Does that sound like a reasonable way to do this?




On Tue, Sep 27, 2022 at 1:59 PM Li Jin  wrote:

> Thanks both. I think NamedTableProvider is close to what I want, and like
> Weston said, the tricky bit is how to use a custom NamedTableProvider when
> calling the pyarrow substrait API.
>
> It's a little hacky but I *think* I can override the value 
> "kDefaultNamedTableProvider"
> here and pass "table_provider=None" then it "should" work:
>
> https://github.com/apache/arrow/blob/529f653dfa58887522af06028e5c32e8dd1a14ea/cpp/src/arrow/engine/substrait/options.h#L66
>
> I am going to give that a shot once I pull/build Arrow default into our
> internal build system.
>
>
>
>
> On Tue, Sep 27, 2022 at 10:50 AM Benjamin Kietzman 
> wrote:
>
>> It seems to me that your use case could be handled by defining a custom
>> NamedTableProvider and
>> assigning this to ConversionOptions::named_table_provider. This was added
>> in
>> https://github.com/apache/arrow/pull/13613 to provide user configurable
>> dispatching for named tables;
>> if it doesn't address your use case then we might want to create a JIRA to
>> extend it.
>>
>> On Tue, Sep 27, 2022 at 10:41 AM Li Jin  wrote:
>>
>> > I did some more digging into this and have some ideas -
>> >
>> > Currently, the logic for deserialization named table is:
>> >
>> >
>> https://github.com/apache/arrow/blob/master/cpp/src/arrow/engine/substrait/relation_internal.cc#L129
>> > and it will look up named tables from a user provided dictionary from
>> > string -> arrow Table.
>> >
>> > My idea is to make some short term changes to allow named tables to be
>> > dispatched differently (This logic can be reverted/removed once we
>> figure
>> > out the proper way to support custom data sources, perhaps via substrait
>> > Extensions.), specifically:
>> >
>> > (1) The user creates named table with uris for custom data source, i.e.,
>> > "my_datasource://tablename?begin=20200101&end=20210101"
>> > (2) In the substrait consumer, allowing user to register custom dispatch
>> > rules based on uri scheme (similar to how exec node registry works),
>> i.e.,
>> > sth like:
>> >
>> > substrait_named_table_registry.add("my_datasource", deser_my_datasource)
>> > and deser_my_datasource is a function that takes the NamedTable
>> substrait
>> > message and returns a declaration.
>> >
>> > I know doing this just for named tables might not be a very general
>> > solution but seems the easiest path forward, and we can always remove
>> this
>> > later in favor of a more generic solution.
>> >
>> > Thoughts?
>> >
>> > Li
>> >
>> >
>> >
>> >
>> >
>> > On Mon, Sep 26, 2022 at 10:58 AM Li Jin  wrote:
>> >
>> > > Hello!
>> > >
>> > > I am working on adding a custom data source node in Acero. I have a
>> few
>> > > previous threads related to this topic.
>> > >
>> > > Currently, I am able to register my custom factory method with Acero
>> and
>> > > create a Custom source node, i.e., I can register and execute this
>> with
>> > > Acero:
>> > >
>> > > MySourceNodeOptions source_options = ...
>> > > Declaration source{"my_source", source_option}
>> > >
>> > > The next step I want to do is to pass this through to the Acero
>> substrait
>> > > consumer. From previous discussions, I am going to use "NamedTable ''
>> as
>> > a
>> > > temporary way to define my custom data source in substrait. My
>> question
>> > is
>> > > this:
>> > >
>> > > What I need to do in substrait in order to register my own substrait
>> > > consumer rule/function for deserializing my custom named table
>> protobuf
>> > > message into the declaration above. If this is not supported right
>> now,
>> > > what is a reasonable/minimal change to make this work?
>> > >
>> > > Thanks,
>> > > Li
>> > >
>> >
>>
>


Re: Question about pyarrow.substrait.run_query

2022-10-13 Thread Li Jin
Thank you Weston!

On Thu, Oct 13, 2022 at 1:05 AM Weston Pace  wrote:

> 1. Yes.
> 2. I was going to say yes but...on closer examination...it appears
> that it is not applying backpressure.
>
> The SinkNode accumulates batches in a queue and applies backpressure.
> I thought we were using a sink node since it is the normal "accumulate
> batches into a queue" sink.  However, the Substrait<->Python
> integration is not using a sink node but instead a custom
> ConsumingSinkNode (SubstraitSinkConsumer).  The SubstraitSinkConsumer
> does accumulate batches in a queue (just like the sink node) but it is
> not handling backpressure.  I've created [1] to track this.
>
> [1] https://issues.apache.org/jira/browse/ARROW-18025
>
> On Wed, Oct 12, 2022 at 9:02 AM Li Jin  wrote:
> >
> > Hello!
> >
> > I have some questions about how "pyarrow.substrait.run_query" works.
> >
> > Currently run_query returns a record batch reader. Since Acero is a
> > push-based model and the reader is pull-based, I'd assume the reader
> object
> > somehow accumulates the batches that are pushed to it. And I wonder
> >
> > (1) Does the output batches keep accumulating in the reader object, until
> > someone reads from the reader?
> > (2) Are there any back pressure mechanisms implemented to prevent OOM if
> > data doesn't get pulled from the reader? (Bounded cache in the reader
> > object?)
> >
> > Thanks,
> > Li
>


Question about pyarrow.substrait.run_query

2022-10-12 Thread Li Jin
Hello!

I have some questions about how "pyarrow.substrait.run_query" works.

Currently run_query returns a record batch reader. Since Acero is a
push-based model and the reader is pull-based, I'd assume the reader object
somehow accumulates the batches that are pushed to it. And I wonder

(1) Does the output batches keep accumulating in the reader object, until
someone reads from the reader?
(2) Are there any back pressure mechanisms implemented to prevent OOM if
data doesn't get pulled from the reader? (Bounded cache in the reader
object?)

Thanks,
Li


Re: Pandas backend for Substrait

2022-10-06 Thread Li Jin
Disclaimer: Not ibis-substrait dev here

ibis-substrait has a "decompiler";
https://github.com/ibis-project/ibis-substrait/blob/main/ibis_substrait/tests/compiler/test_decompiler.py
that takes substrait and returns ibis expression, then you can run ibis
expression with ibis's pandas backend:
https://github.com/ibis-project/ibis/tree/master/ibis/backends/pandas


On Thu, Oct 6, 2022 at 2:37 PM Niranda Perera 
wrote:

> Hi all,
>
> I was wondering if there is a pandas backend for substrait?
> Is there any other backend we could play around with?
>
> Best
> --
> Niranda Perera
> https://niranda.dev/
> @n1r44 
>


Re: Integration between ibis-substrait and Acero

2022-10-05 Thread Li Jin
Ok I think I got a working version now:

t = ibis.table([("a", "int64"), ("b", "int64")], name="table0")

test_table_0 = pa.Table.from_pydict({"a": [1, 2, 3], "b": [4, 5,
6]})



result = self.compiler.compile(t)



def table_provider(names):

if not names:

raise Exception("No names provided")

elif names[0] == 'table0':

return test_table_0

else:

raise Exception(f"Unknown table name {names}")



reader =
pa.substrait.run_query(pa.py_buffer(result.SerializeToString()),
table_provider)

result_table = reader.read_all()



self.assertTrue(result_table == test_table_0)

First successful run with ibis/substrait/acero - Hooray

On Wed, Oct 5, 2022 at 2:33 PM Li Jin  wrote:

> Hmm. Thanks for the update - Now I searched the code more, it seems
> perhaps I should be using "compile" rather than "translate";
>
>
> https://github.com/ibis-project/ibis-substrait/blob/main/ibis_substrait/compiler/core.py#L82
>
> Let me try some more
>
> On Wed, Oct 5, 2022 at 1:42 PM Will Jones  wrote:
>
>> Hi Li Jin,
>>
>> The original segfault seems to occur because you are passing a Python
>> bytes
>> object and not a PyArrow Buffer object. You can wrap the bytes object
>> using
>> pa.py_buffer():
>>
>> pa.substrait.run_query(pa.py_buffer(result_bytes), table_provider)
>>
>>
>> That being said, when I run your full example with that, we now get a
>> different error similar to what you get when you pass in through JSON:
>>
>> Traceback (most recent call last):
>>   File "", line 1, in 
>>   File "pyarrow/_substrait.pyx", line 140, in pyarrow._substrait.run_query
>> c_reader = GetResultValue(c_res_reader)
>>   File "pyarrow/error.pxi", line 144, in
>> pyarrow.lib.pyarrow_internal_check_status
>> return check_status(status)
>>   File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status
>> raise ArrowInvalid(message)
>> pyarrow.lib.ArrowInvalid: ExecPlan has no node
>>
>> /Users/willjones/Documents/arrows/arrow/cpp/src/arrow/engine/substrait/util.cc:82
>>  plan_->Validate()
>>
>> /Users/willjones/Documents/arrows/arrow/cpp/src/arrow/engine/substrait/util.cc:131
>>  executor.Execute()
>>
>>
>> We get the same error even if I add operations onto the plan:
>>
>> result = translate(t.group_by("a").mutate(z = t.b.sum()), compiler)
>> print(result)
>>
>>
>> project {
>>   input {
>> read {
>>   base_schema {
>> names: "a"
>> names: "b"
>> struct {
>>   types {
>> i64 {
>>   nullability: NULLABILITY_NULLABLE
>> }
>>   }
>>   types {
>> i64 {
>>   nullability: NULLABILITY_NULLABLE
>> }
>>   }
>>   nullability: NULLABILITY_REQUIRED
>> }
>>   }
>>   named_table {
>> names: "table0"
>>   }
>> }
>>   }
>>   expressions {
>> selection {
>>   direct_reference {
>> struct_field {
>> }
>>   }
>>   root_reference {
>>   }
>> }
>>   }
>>   expressions {
>> selection {
>>   direct_reference {
>> struct_field {
>>   field: 1
>> }
>>   }
>>   root_reference {
>>   }
>> }
>>   }
>>   expressions {
>> window_function {
>>   function_reference: 1
>>   partitions {
>> selection {
>>   direct_reference {
>> struct_field {
>> }
>>   }
>>   root_reference {
>>   }
>> }
>>   }
>>   upper_bound {
>> unbounded {
>> }
>>   }
>>   lower_bound {
>> unbounded {
>> }
>>   }
>>   phase: AGGREGATION_PHASE_INITIAL_TO_RESULT
>>   output_type {
>> i64 {
>>   nullability: NULLABILITY_NULLABLE
>> }
>>   }
>>   arguments {
>> value {
>>   selection {
>> direct_reference {
>>   struct_field {
>> field: 1
>&g

Re: Integration between ibis-substrait and Acero

2022-10-05 Thread Li Jin
Hmm. Thanks for the update - Now I searched the code more, it seems perhaps
I should be using "compile" rather than "translate";

https://github.com/ibis-project/ibis-substrait/blob/main/ibis_substrait/compiler/core.py#L82

Let me try some more

On Wed, Oct 5, 2022 at 1:42 PM Will Jones  wrote:

> Hi Li Jin,
>
> The original segfault seems to occur because you are passing a Python bytes
> object and not a PyArrow Buffer object. You can wrap the bytes object using
> pa.py_buffer():
>
> pa.substrait.run_query(pa.py_buffer(result_bytes), table_provider)
>
>
> That being said, when I run your full example with that, we now get a
> different error similar to what you get when you pass in through JSON:
>
> Traceback (most recent call last):
>   File "", line 1, in 
>   File "pyarrow/_substrait.pyx", line 140, in pyarrow._substrait.run_query
> c_reader = GetResultValue(c_res_reader)
>   File "pyarrow/error.pxi", line 144, in
> pyarrow.lib.pyarrow_internal_check_status
> return check_status(status)
>   File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status
> raise ArrowInvalid(message)
> pyarrow.lib.ArrowInvalid: ExecPlan has no node
>
> /Users/willjones/Documents/arrows/arrow/cpp/src/arrow/engine/substrait/util.cc:82
>  plan_->Validate()
>
> /Users/willjones/Documents/arrows/arrow/cpp/src/arrow/engine/substrait/util.cc:131
>  executor.Execute()
>
>
> We get the same error even if I add operations onto the plan:
>
> result = translate(t.group_by("a").mutate(z = t.b.sum()), compiler)
> print(result)
>
>
> project {
>   input {
> read {
>   base_schema {
> names: "a"
> names: "b"
> struct {
>   types {
> i64 {
>   nullability: NULLABILITY_NULLABLE
> }
>   }
>   types {
> i64 {
>   nullability: NULLABILITY_NULLABLE
> }
>   }
>   nullability: NULLABILITY_REQUIRED
> }
>   }
>   named_table {
> names: "table0"
>   }
> }
>   }
>   expressions {
> selection {
>   direct_reference {
> struct_field {
> }
>   }
>   root_reference {
>   }
> }
>   }
>   expressions {
> selection {
>   direct_reference {
> struct_field {
>   field: 1
> }
>   }
>   root_reference {
>   }
> }
>   }
>   expressions {
> window_function {
>   function_reference: 1
>   partitions {
> selection {
>   direct_reference {
> struct_field {
> }
>   }
>   root_reference {
>   }
> }
>   }
>   upper_bound {
> unbounded {
> }
>   }
>   lower_bound {
> unbounded {
> }
>   }
>   phase: AGGREGATION_PHASE_INITIAL_TO_RESULT
>   output_type {
> i64 {
>   nullability: NULLABILITY_NULLABLE
> }
>   }
>   arguments {
> value {
>   selection {
> direct_reference {
>   struct_field {
> field: 1
>   }
> }
> root_reference {
> }
>   }
> }
>   }
> }
>   }
> }
>
>
> Full reproduction:
>
> import pyarrow as pa
> import pyarrow.substrait
> import ibis
> from ibis_substrait.compiler.core import SubstraitCompiler
> from ibis_substrait.compiler.translate import translate
>
>
> compiler = SubstraitCompiler()
>
>
> t = ibis.table([("a", "int64"), ("b", "int64")], name="table0")
> result = translate(t.group_by("a").mutate(z = t.b.sum()), compiler)
>
> def table_provider(names):
> if not names:
> raise Exception("No names provided")
>     elif names[0] == 'table0':
> return test_table_0
> else:
> raise Exception(f"Unknown table name {names}")
>
>
> test_table_0 = pa.Table.from_pydict({"a": [1, 2, 3], "b": [4, 5, 6]})
>
> result_bytes = result.SerializeToString()
>
> pa.substrait.run_query(pa.py_buffer(result_bytes), table_provider)
>
> Best,
>
> Will Jones
>
> On Tue, Oct 4, 2022 at 12:30 PM Li Jin  wrote:
>
> > For reference, this is the "relations" entry that I was referring to:
> >
> >
> https://github.com/apache/arrow/blob/master/python/pyarrow/test

Re: Integration between ibis-substrait and Acero

2022-10-04 Thread Li Jin
For reference, this is the "relations" entry that I was referring to:
https://github.com/apache/arrow/blob/master/python/pyarrow/tests/test_substrait.py#L186

On Tue, Oct 4, 2022 at 3:28 PM Li Jin  wrote:

> So I made some progress with updated code:
>
> t = ibis.table([("a", "int64"), ("b", "int64")], name="table0")
>
> test_table_0 = pa.Table.from_pydict({"a": [1, 2, 3], "b": [4, 5,
> 6]})
>
>
>
> result = translate(t, self.compiler)
>
>
>
> def table_provider(names):
>
> if not names:
>
> raise Exception("No names provided")
>
> elif names[0] == 'table0':
>
> return test_table_0
>
> else:
>
> raise Exception(f"Unknown table name {names}")
>
>
>
> print(result)
>
> result_buf =
> pa._substrait._parse_json_plan(tobytes(MessageToJson(result)))
>
>
>
> pa.substrait.run_query(result_buf, table_provider)
>
> I think now the plan is passed properly and I got a "ArrowInvalid: Empty
> substrait plan is passed"
>
>
> Looking the plan reproduces by ibis-substrait, it looks like doesn't match
> the expected format of Acero consumer. In particular, it looks like the
> plan produced by ibis-substrait doesn't have a "relations" entry - any
> thoughts on how this can be fixed? (I don't know if I am using the API
> wrong or some format inconsistency between the two)
>
> On Tue, Oct 4, 2022 at 1:54 PM Li Jin  wrote:
>
>> Hi,
>>
>> I am testing integration between ibis-substrait and Acero but hit a
>> segmentation fault. I think this might be cause the way I am
>> integrating these two libraries are wrong, here is my code:
>>
>> Li Jin
>> 1:51 PM (1 minute ago)
>> to me
>>
>> class BasicTests(unittest.TestCase):
>>
>> """Test basic features"""
>>
>>
>>
>>
>>
>> @classmethod
>>
>> def setUpClass(cls):
>>
>> cls.compiler = SubstraitCompiler()
>>
>>
>>
>> def test_named_table(self):
>>
>> """Test basic"""
>>
>> t = ibis.table([("a", "int64"), ("b", "int64")], name="table0")
>>
>> result = translate(t, self.compiler)
>>
>>
>>
>> def table_provider(names):
>>
>> if not names:
>>
>> raise Exception("No names provided")
>>
>> elif names[0] == 'table0':
>>
>> return test_table_0
>>
>> else:
>>
>> raise Exception(f"Unknown table name {names}")
>>
>>
>>
>> test_table_0 = pa.Table.from_pydict({"a": [1, 2, 3], "b": [4, 5,
>> 6]})
>>
>>
>>
>> print(type(result))
>>
>> print(result)
>>
>> result_bytes = result.SerializeToString()
>>
>>
>>
>> pa.substrait.run_query(result_bytes, table_provider)
>>
>>
>> I wonder if someone has tried integration between these two before and
>> can share some working code?
>>
>


Re: Integration between ibis-substrait and Acero

2022-10-04 Thread Li Jin
So I made some progress with updated code:

t = ibis.table([("a", "int64"), ("b", "int64")], name="table0")

test_table_0 = pa.Table.from_pydict({"a": [1, 2, 3], "b": [4, 5,
6]})



result = translate(t, self.compiler)



def table_provider(names):

if not names:

raise Exception("No names provided")

elif names[0] == 'table0':

return test_table_0

else:

raise Exception(f"Unknown table name {names}")



print(result)

result_buf =
pa._substrait._parse_json_plan(tobytes(MessageToJson(result)))



pa.substrait.run_query(result_buf, table_provider)

I think now the plan is passed properly and I got a "ArrowInvalid: Empty
substrait plan is passed"


Looking the plan reproduces by ibis-substrait, it looks like doesn't match
the expected format of Acero consumer. In particular, it looks like the
plan produced by ibis-substrait doesn't have a "relations" entry - any
thoughts on how this can be fixed? (I don't know if I am using the API
wrong or some format inconsistency between the two)

On Tue, Oct 4, 2022 at 1:54 PM Li Jin  wrote:

> Hi,
>
> I am testing integration between ibis-substrait and Acero but hit a
> segmentation fault. I think this might be cause the way I am
> integrating these two libraries are wrong, here is my code:
>
> Li Jin
> 1:51 PM (1 minute ago)
> to me
>
> class BasicTests(unittest.TestCase):
>
> """Test basic features"""
>
>
>
>
>
> @classmethod
>
> def setUpClass(cls):
>
> cls.compiler = SubstraitCompiler()
>
>
>
> def test_named_table(self):
>
> """Test basic"""
>
> t = ibis.table([("a", "int64"), ("b", "int64")], name="table0")
>
> result = translate(t, self.compiler)
>
>
>
> def table_provider(names):
>
> if not names:
>
> raise Exception("No names provided")
>
> elif names[0] == 'table0':
>
> return test_table_0
>
> else:
>
> raise Exception(f"Unknown table name {names}")
>
>
>
> test_table_0 = pa.Table.from_pydict({"a": [1, 2, 3], "b": [4, 5,
> 6]})
>
>
>
> print(type(result))
>
> print(result)
>
> result_bytes = result.SerializeToString()
>
>
>
> pa.substrait.run_query(result_bytes, table_provider)
>
>
> I wonder if someone has tried integration between these two before and can
> share some working code?
>


Integration between ibis-substrait and Acero

2022-10-04 Thread Li Jin
Hi,

I am testing integration between ibis-substrait and Acero but hit a
segmentation fault. I think this might be cause the way I am
integrating these two libraries are wrong, here is my code:

Li Jin
1:51 PM (1 minute ago)
to me

class BasicTests(unittest.TestCase):

"""Test basic features"""





@classmethod

def setUpClass(cls):

cls.compiler = SubstraitCompiler()



def test_named_table(self):

"""Test basic"""

t = ibis.table([("a", "int64"), ("b", "int64")], name="table0")

result = translate(t, self.compiler)



def table_provider(names):

if not names:

raise Exception("No names provided")

elif names[0] == 'table0':

return test_table_0

else:

raise Exception(f"Unknown table name {names}")



test_table_0 = pa.Table.from_pydict({"a": [1, 2, 3], "b": [4, 5,
6]})



print(type(result))

print(result)

result_bytes = result.SerializeToString()



pa.substrait.run_query(result_bytes, table_provider)


I wonder if someone has tried integration between these two before and can
share some working code?


Re: Register custom ExecNode factories

2022-09-28 Thread Li Jin
Thanks Yaron - I have figured something out. Currently I have created an
internal c++ codebase that exposes the "Initialize" method and have an
internal Python codebase that invokes it via Python/C++ bindings similar to
how dataset does it.

On Wed, Sep 28, 2022 at 1:02 PM Yaron Gvili  wrote:

> I agree with Weston about dynamically loading a shared object with
> initialization code for registering node factories. For custom node
> factories, I think this loading would best be done from a separate Python
> module, different than "_exec_plan.pyx", that the user would need to import
> for triggering (once) the registration. This would avoid merging custom
> code into "_exec_plan.pyx" and maintaining it. You would likely want to
> code up files for your module that are analogous to
> "python/pyarrow/includes/libarrow_dataset.pxd",
> "python/pyarrow/_dataset.pxd", and "python/pyarrow/dataset.py". You would
> need to modify the files "python/setup.py" and "python/CMakeLists.txt" in
> order to build your module within PyArrow's build, or alternatively to roll
> your own version of these files to build your Python module separately.
> This is where you would add a build flag for pulling in C++ header files
> for your Python module, under "python/pyarrow/include", and for making it.
>
>
> Yaron.
> 
> From: Li Jin 
> Sent: Wednesday, September 21, 2022 3:51 PM
> To: dev@arrow.apache.org 
> Subject: Re: Register custom ExecNode factories
>
> Thanks Weston - I have not rewritten Python/C++ bridge so this is also new
> to me and I am hoping to get some information from people that know how to
> do this.
>
> I will leave this open for other people to offer help :) and will ask some
> internal folks as well.
>
> Will circle back on this.
>
> On Tue, Sep 20, 2022 at 8:50 PM Weston Pace  wrote:
>
> > I'm not great at this build stuff but I think the basic idea is that
> > you will need to package your custom nodes into a shared object.
> > You'll need to then somehow trigger that shared object to load from
> > python.  This seems like a good place to invoke the initialize method.
> >
> > Currently pyarrow has to do this because the datasets module
> > (libarrow_dataset.so) adds some custom nodes (scan node, dataset write
> > node).  The datasets module defines the Initialize method.  This
> > method is called in _exec_plan.pyx when the python module is loaded.
> > I don't know cython well enough to know how exactly it triggers the
> > datasets shared object to load.
> >
> > On Tue, Sep 20, 2022 at 11:01 AM Li Jin  wrote:
> > >
> > > Hi,
> > >
> > > Recently I am working on adding a custom data source node to Acero and
> > was
> > > pointed to a few examples in the dataset code.
> > >
> > > If I understand this correctly, the registering of dataset exec node is
> > > currently happening when this is loaded:
> > >
> >
> https://github.com/apache/arrow/blob/master/python/pyarrow/_exec_plan.pyx#L36
> > >
> > > I wonder if I have a custom "Initialize'' method that registers
> > additional
> > > ExecNode, where is the right place to invoke such initialization?
> > > Eventually I want to execute my query via ibis-substrait and Acero
> > > substrait consumer Python API.
> > >
> > > Thanks,
> > > Li
> >
>


Re: Substrait consumer for custom data sources

2022-09-27 Thread Li Jin
Thanks both. I think NamedTableProvider is close to what I want, and like
Weston said, the tricky bit is how to use a custom NamedTableProvider when
calling the pyarrow substrait API.

It's a little hacky but I *think* I can override the value
"kDefaultNamedTableProvider"
here and pass "table_provider=None" then it "should" work:
https://github.com/apache/arrow/blob/529f653dfa58887522af06028e5c32e8dd1a14ea/cpp/src/arrow/engine/substrait/options.h#L66

I am going to give that a shot once I pull/build Arrow default into our
internal build system.




On Tue, Sep 27, 2022 at 10:50 AM Benjamin Kietzman 
wrote:

> It seems to me that your use case could be handled by defining a custom
> NamedTableProvider and
> assigning this to ConversionOptions::named_table_provider. This was added
> in
> https://github.com/apache/arrow/pull/13613 to provide user configurable
> dispatching for named tables;
> if it doesn't address your use case then we might want to create a JIRA to
> extend it.
>
> On Tue, Sep 27, 2022 at 10:41 AM Li Jin  wrote:
>
> > I did some more digging into this and have some ideas -
> >
> > Currently, the logic for deserialization named table is:
> >
> >
> https://github.com/apache/arrow/blob/master/cpp/src/arrow/engine/substrait/relation_internal.cc#L129
> > and it will look up named tables from a user provided dictionary from
> > string -> arrow Table.
> >
> > My idea is to make some short term changes to allow named tables to be
> > dispatched differently (This logic can be reverted/removed once we figure
> > out the proper way to support custom data sources, perhaps via substrait
> > Extensions.), specifically:
> >
> > (1) The user creates named table with uris for custom data source, i.e.,
> > "my_datasource://tablename?begin=20200101&end=20210101"
> > (2) In the substrait consumer, allowing user to register custom dispatch
> > rules based on uri scheme (similar to how exec node registry works),
> i.e.,
> > sth like:
> >
> > substrait_named_table_registry.add("my_datasource", deser_my_datasource)
> > and deser_my_datasource is a function that takes the NamedTable substrait
> > message and returns a declaration.
> >
> > I know doing this just for named tables might not be a very general
> > solution but seems the easiest path forward, and we can always remove
> this
> > later in favor of a more generic solution.
> >
> > Thoughts?
> >
> > Li
> >
> >
> >
> >
> >
> > On Mon, Sep 26, 2022 at 10:58 AM Li Jin  wrote:
> >
> > > Hello!
> > >
> > > I am working on adding a custom data source node in Acero. I have a few
> > > previous threads related to this topic.
> > >
> > > Currently, I am able to register my custom factory method with Acero
> and
> > > create a Custom source node, i.e., I can register and execute this with
> > > Acero:
> > >
> > > MySourceNodeOptions source_options = ...
> > > Declaration source{"my_source", source_option}
> > >
> > > The next step I want to do is to pass this through to the Acero
> substrait
> > > consumer. From previous discussions, I am going to use "NamedTable ''
> as
> > a
> > > temporary way to define my custom data source in substrait. My question
> > is
> > > this:
> > >
> > > What I need to do in substrait in order to register my own substrait
> > > consumer rule/function for deserializing my custom named table protobuf
> > > message into the declaration above. If this is not supported right now,
> > > what is a reasonable/minimal change to make this work?
> > >
> > > Thanks,
> > > Li
> > >
> >
>


Re: Substrait consumer for custom data sources

2022-09-27 Thread Li Jin
I did some more digging into this and have some ideas -

Currently, the logic for deserialization named table is:
https://github.com/apache/arrow/blob/master/cpp/src/arrow/engine/substrait/relation_internal.cc#L129
and it will look up named tables from a user provided dictionary from
string -> arrow Table.

My idea is to make some short term changes to allow named tables to be
dispatched differently (This logic can be reverted/removed once we figure
out the proper way to support custom data sources, perhaps via substrait
Extensions.), specifically:

(1) The user creates named table with uris for custom data source, i.e.,
"my_datasource://tablename?begin=20200101&end=20210101"
(2) In the substrait consumer, allowing user to register custom dispatch
rules based on uri scheme (similar to how exec node registry works), i.e.,
sth like:

substrait_named_table_registry.add("my_datasource", deser_my_datasource)
and deser_my_datasource is a function that takes the NamedTable substrait
message and returns a declaration.

I know doing this just for named tables might not be a very general
solution but seems the easiest path forward, and we can always remove this
later in favor of a more generic solution.

Thoughts?

Li





On Mon, Sep 26, 2022 at 10:58 AM Li Jin  wrote:

> Hello!
>
> I am working on adding a custom data source node in Acero. I have a few
> previous threads related to this topic.
>
> Currently, I am able to register my custom factory method with Acero and
> create a Custom source node, i.e., I can register and execute this with
> Acero:
>
> MySourceNodeOptions source_options = ...
> Declaration source{"my_source", source_option}
>
> The next step I want to do is to pass this through to the Acero substrait
> consumer. From previous discussions, I am going to use "NamedTable '' as a
> temporary way to define my custom data source in substrait. My question is
> this:
>
> What I need to do in substrait in order to register my own substrait
> consumer rule/function for deserializing my custom named table protobuf
> message into the declaration above. If this is not supported right now,
> what is a reasonable/minimal change to make this work?
>
> Thanks,
> Li
>


Substrait consumer for custom data sources

2022-09-26 Thread Li Jin
Hello!

I am working on adding a custom data source node in Acero. I have a few
previous threads related to this topic.

Currently, I am able to register my custom factory method with Acero and
create a Custom source node, i.e., I can register and execute this with
Acero:

MySourceNodeOptions source_options = ...
Declaration source{"my_source", source_option}

The next step I want to do is to pass this through to the Acero substrait
consumer. From previous discussions, I am going to use "NamedTable '' as a
temporary way to define my custom data source in substrait. My question is
this:

What I need to do in substrait in order to register my own substrait
consumer rule/function for deserializing my custom named table protobuf
message into the declaration above. If this is not supported right now,
what is a reasonable/minimal change to make this work?

Thanks,
Li


Re: Register custom ExecNode factories

2022-09-21 Thread Li Jin
Thanks Weston - I have not rewritten Python/C++ bridge so this is also new
to me and I am hoping to get some information from people that know how to
do this.

I will leave this open for other people to offer help :) and will ask some
internal folks as well.

Will circle back on this.

On Tue, Sep 20, 2022 at 8:50 PM Weston Pace  wrote:

> I'm not great at this build stuff but I think the basic idea is that
> you will need to package your custom nodes into a shared object.
> You'll need to then somehow trigger that shared object to load from
> python.  This seems like a good place to invoke the initialize method.
>
> Currently pyarrow has to do this because the datasets module
> (libarrow_dataset.so) adds some custom nodes (scan node, dataset write
> node).  The datasets module defines the Initialize method.  This
> method is called in _exec_plan.pyx when the python module is loaded.
> I don't know cython well enough to know how exactly it triggers the
> datasets shared object to load.
>
> On Tue, Sep 20, 2022 at 11:01 AM Li Jin  wrote:
> >
> > Hi,
> >
> > Recently I am working on adding a custom data source node to Acero and
> was
> > pointed to a few examples in the dataset code.
> >
> > If I understand this correctly, the registering of dataset exec node is
> > currently happening when this is loaded:
> >
> https://github.com/apache/arrow/blob/master/python/pyarrow/_exec_plan.pyx#L36
> >
> > I wonder if I have a custom "Initialize'' method that registers
> additional
> > ExecNode, where is the right place to invoke such initialization?
> > Eventually I want to execute my query via ibis-substrait and Acero
> > substrait consumer Python API.
> >
> > Thanks,
> > Li
>


Re: Correct way to collect results from an Acero query

2022-09-21 Thread Li Jin
Oh thanks Weston I am glad not the only one - I will wait for the PR and
will try to pull that in then.

Thanks,
Li

On Wed, Sep 21, 2022 at 1:54 PM Weston Pace  wrote:

> Funny you should mention this, I just ran into the same problem :).
> We use StartAndCollect so much in our unit tests that there must be
> some usefulness there.  You are correct that it is not an API that can
> be used outside of tests.
>
> I added utility methods DeclarationToTable, DeclarationToBatches, and
> DeclarationToExecBatches to exec_plan.h in [1]. These all take in a
> declaration (that does not have a sink node), add a sink node, create
> an exec plan, and run it.  It might be a bit before [1] merges so if
> you want to pull these out into their own PR that might be useful.
>
> The utility methods capture the common case where a user wants to use
> the default exec context and run the plan immediately.  The main
> downside of these utility methods is that they gather all results in
> memory.  However, if you are dealing with small amounts of data (e.g.
> prototyping, testing) or doing some kind of aggregation then this
> might not be a problem.
>
> We could probably also add a DeclarationToReader method in the future.
>
> [1] https://github.com/apache/arrow/pull/13782
>
> On Wed, Sep 21, 2022 at 8:26 AM Li Jin  wrote:
> >
> > Hello!
> >
> > I am testing a custom data source node I added to Acero and found myself
> in
> > need of collecting the results from an Acero query into memory.
> >
> > Searching the codebase, I found "StartAndCollect" is what many of the
> tests
> > and benchmarks are using, but I am not sure if that is the public API to
> do
> > so because:
> > (1) the header file arrow/compute/exec/test_util.h depends on gtest,
> which
> > seems to be a test-only dependency
> > (2) the method "StartAndCollect" doesn't return a Result/Status object,
> so
> > errors probably cannot be propagated.
> >
> > Is there a better way / some other public method to achieve this?
> >
> > Thanks,
> > Li
>


Correct way to collect results from an Acero query

2022-09-21 Thread Li Jin
Hello!

I am testing a custom data source node I added to Acero and found myself in
need of collecting the results from an Acero query into memory.

Searching the codebase, I found "StartAndCollect" is what many of the tests
and benchmarks are using, but I am not sure if that is the public API to do
so because:
(1) the header file arrow/compute/exec/test_util.h depends on gtest, which
seems to be a test-only dependency
(2) the method "StartAndCollect" doesn't return a Result/Status object, so
errors probably cannot be propagated.

Is there a better way / some other public method to achieve this?

Thanks,
Li


Register custom ExecNode factories

2022-09-20 Thread Li Jin
Hi,

Recently I am working on adding a custom data source node to Acero and was
pointed to a few examples in the dataset code.

If I understand this correctly, the registering of dataset exec node is
currently happening when this is loaded:
https://github.com/apache/arrow/blob/master/python/pyarrow/_exec_plan.pyx#L36

I wonder if I have a custom "Initialize'' method that registers additional
ExecNode, where is the right place to invoke such initialization?
Eventually I want to execute my query via ibis-substrait and Acero
substrait consumer Python API.

Thanks,
Li


Re: Integration between Flight and Acero

2022-09-14 Thread Li Jin
Thanks both for the suggestions, it makes sense.

I will try with SourceNode with the factory method first because my
service/client API doesn't support parallel read yet. (Parallel reading
while preserving data ordering via flight protocol is something I thought
about a little bit but probably something to solve later)

Li

On Tue, Sep 13, 2022 at 8:39 PM Weston Pace  wrote:

> Yes.  If you need the source node to read in parallel OR if you have
> multiple fragments (especially if those fragments don't have identical
> schemas) then you want a dataset and not just a plain source node.
>
> On Tue, Sep 13, 2022 at 1:55 PM David Li  wrote:
> >
> > Yeah, I concur with Weston.
> >
> > > To start with I think a custom factory function will be sufficient
> > > (e.g. look at MakeScanNode in scanner.cc for an example).  So the
> > > options would somehow describe the coordinates of the flight endpoint.
> >
> > These 'coordinates' would be a FlightDescriptor.
> >
> > > However, it might be nice if "open a connection to the flight
> > > endpoint" happened during the call to StartProducing and not during
> > > the factory function call.  This could maybe be a follow-up task.
> >
> > The factory would call GetFlightInfo (or maybe GetSchema, from what it
> sounds like) to get the schema, but this wouldn't actually read any data.
> StartProducing would then actually call DoGet to actually read data.
> >
> > ---
> >
> > The reason why I suggested adapting Flight to Dataset, assuming this
> matches the semantics of your service, is because it encapsulates these
> steps, but reuses all the machinery we already have:
> >
> > - Dataset discovery naturally becomes GetFlightInfo. (Semantically, this
> is like beginning execution of a query, and returns one or more partitions
> where the result set can be read.)
> > - Those partitions then each become a Fragment, and then they can be
> read in parallel by Dataset.
> >
> > It sounds like the service in question here isn't quite that complex,
> though, so no need to necessarily go that far.
> >
> > On Tue, Sep 13, 2022, at 19:18, Weston Pace wrote:
> > >> The alternative path of subclassing SourceNode and having
> ExecNode::Init or
> > >> ExecNode::StartProducing seems quite a bit of change (also I don't
> think
> > >> SourceNode is exposed via public header). But let me know if you
> think I am
> > >> missing something.
> > >
> > > Agreed that we don't want to go this route.  David's suggestion is a
> > > good idea.  However, this shouldn't be the responsibility of the
> > > caller exactly.
> > >
> > > In other words (and my lack of detailed knowledge about flight is
> > > probably going to leak here) there should still be a factory function
> > > (e.g. "flight_source" or something like that) and a custom options
> > > object (FlightSourceOptions).
> > >
> > > To start with I think a custom factory function will be sufficient
> > > (e.g. look at MakeScanNode in scanner.cc for an example).  So the
> > > options would somehow describe the coordinates of the flight endpoint.
> > > The factory function would open a connection to the flight endpoint
> > > and convert this into a record batch reader.  Then it would create one
> > > of the node's that Yaron has contributed and return that.
> > >
> > > However, it might be nice if "open a connection to the flight
> > > endpoint" happened during the call to StartProducing and not during
> > > the factory function call.  This could maybe be a follow-up task.
> > > Perhaps source node could change so that, instead of accepting an
> > > AsyncGenerator, it accepts an AsyncGenerator factory function.  Then
> > > it could execute that function during the call to StartProducing.
> > >
> > > On Tue, Sep 13, 2022 at 4:05 PM Li Jin  wrote:
> > >>
> > >> Thanks Yaron for the pointer to that PR.
> > >>
> > >> On Tue, Sep 13, 2022 at 4:43 PM Yaron Gvili 
> wrote:
> > >>
> > >> > If you can wrap the flight reader as a RecordBatchReader, then
> another
> > >> > possibility is using an upcoming PR (
> > >> > https://github.com/apache/arrow/pull/14041) that enables
> SourceNode to
> > >> > accept it. You would need to know the schema when configuring the
> > >> > SourceNode, but you won't need to derived from SourceNode.
> > >>

Re: Integration between Flight and Acero

2022-09-13 Thread Li Jin
Thanks Yaron for the pointer to that PR.

On Tue, Sep 13, 2022 at 4:43 PM Yaron Gvili  wrote:

> If you can wrap the flight reader as a RecordBatchReader, then another
> possibility is using an upcoming PR (
> https://github.com/apache/arrow/pull/14041) that enables SourceNode to
> accept it. You would need to know the schema when configuring the
> SourceNode, but you won't need to derived from SourceNode.
>
>
> Yaron.
> ________
> From: Li Jin 
> Sent: Tuesday, September 13, 2022 3:58 PM
> To: dev@arrow.apache.org 
> Subject: Re: Integration between Flight and Acero
>
> Update:
>
> I am going to try what David Li suggested here:
> https://lists.apache.org/thread/8yfvvyyc79m11z9wql0gzdr25x4b3g7v
>
> This seems to be the least amount of code. This does require calling
> "DoGet" at Acero plan/node creation time rather than execution time but I
> don't think it's a big deal for now.
>
> The alternative path of subclassing SourceNode and having ExecNode::Init or
> ExecNode::StartProducing seems quite a bit of change (also I don't think
> SourceNode is exposed via public header). But let me know if you think I am
> missing something.
>
> Li
>
> On Tue, Sep 6, 2022 at 4:57 AM Yaron Gvili  wrote:
>
> > Hi Li,
> >
> > Here's my 2 cents about the Ibis/Substrait part of this.
> >
> > An Ibis expression carries a schema. If you're planning to create an
> > integrated Ibis/Substrait/Arrow solution, then you'll need the schema to
> be
> > available to Ibis in Python. So, you'll need a Python wrapper for the C++
> > implementation you have in mind for the GetSchema method. I think you
> > should pass the schema obtained by (the wrapped) GetSchema to an Ibis
> node,
> > rather than defining a new Ibis node that would have to access the
> network
> > to get the schema on its own.
> >
> > Given the above, I agree with you that when the Acero node is created its
> > schema would already be known.
> >
> >
> > Yaron.
> > 
> > From: Li Jin 
> > Sent: Thursday, September 1, 2022 2:49 PM
> > To: dev@arrow.apache.org 
> > Subject: Re: Integration between Flight and Acero
> >
> > Thanks David. I think my original question might not have been accurate
> so
> > I will try to rephrase my question:
> >
> > My ultimate goal is to add an ibis source node:
> >
> > class MyStorageTable(ibis.TableNode, sch.HasSchema):
> > url = ... # e.g. "my_storage://my_path"
> > begin = ... # e.g. "20220101"
> > end = ... # e.g. "20220201"
> >
> > and pass it to Acero and have Acero create a source node that knows how
> to
> > read from my_storage. Currently, I have a C++ class that looks like this
> > that knows how to read/write data:
> >
> > class MyStorageClient {
> >
> > public:
> >
> > /// \brief Construct a client
> >
> > MyStorageClient(const std::string& service_location);
> >
> >
> >
> > /// \brief Read data from a table streamingly
> >
> > /// \param[in] table_uri
> >
> > /// \param[in] start_time The start time (inclusive), e.g.,
> > '20100101'
> >
> > /// \param[in] end_time The end time (exclusive), e.g.,
> '20100110'
> >
> > arrow::Result>
> > ReadStream(const std::string& table_uri, const std::string& start_time,
> > const std::string& end_time);
> >
> >
> >
> > /// \brief Write data to a table streamingly
> >
> > /// This method will return a FlightStreamWriter that can be used
> > for streaming data into
> >
> > /// \param[in] table_uri
> >
> > /// \param[in] start_time The start time (inclusive), e.g.,
> > '20100101'
> >
> > /// \param[in] end_time The end time (exclusive), e.g.,
> '20100110'
> >
> > arrow::Result WriteStream(const std::string&
> > table_uri, const std::shared_ptr &schema, const
> std::string
> > &start_time, const std::string &end_time);
> >
> >
> >
> > /// \brief Get schema of a table.
> >
> > /// \param[in] table The Smooth table name, e.g.,
> > smooth:/research/user/ljin/test
> >
> > arrow::Result> GetSchema(const
> > std::string& table_uri);
> > };
> >
> > I think Acero node's schema must be known whe

Re: Integration between Flight and Acero

2022-09-13 Thread Li Jin
Update:

I am going to try what David Li suggested here:
https://lists.apache.org/thread/8yfvvyyc79m11z9wql0gzdr25x4b3g7v

This seems to be the least amount of code. This does require calling
"DoGet" at Acero plan/node creation time rather than execution time but I
don't think it's a big deal for now.

The alternative path of subclassing SourceNode and having ExecNode::Init or
ExecNode::StartProducing seems quite a bit of change (also I don't think
SourceNode is exposed via public header). But let me know if you think I am
missing something.

Li

On Tue, Sep 6, 2022 at 4:57 AM Yaron Gvili  wrote:

> Hi Li,
>
> Here's my 2 cents about the Ibis/Substrait part of this.
>
> An Ibis expression carries a schema. If you're planning to create an
> integrated Ibis/Substrait/Arrow solution, then you'll need the schema to be
> available to Ibis in Python. So, you'll need a Python wrapper for the C++
> implementation you have in mind for the GetSchema method. I think you
> should pass the schema obtained by (the wrapped) GetSchema to an Ibis node,
> rather than defining a new Ibis node that would have to access the network
> to get the schema on its own.
>
> Given the above, I agree with you that when the Acero node is created its
> schema would already be known.
>
>
> Yaron.
> 
> From: Li Jin 
> Sent: Thursday, September 1, 2022 2:49 PM
> To: dev@arrow.apache.org 
> Subject: Re: Integration between Flight and Acero
>
> Thanks David. I think my original question might not have been accurate so
> I will try to rephrase my question:
>
> My ultimate goal is to add an ibis source node:
>
> class MyStorageTable(ibis.TableNode, sch.HasSchema):
> url = ... # e.g. "my_storage://my_path"
> begin = ... # e.g. "20220101"
> end = ... # e.g. "20220201"
>
> and pass it to Acero and have Acero create a source node that knows how to
> read from my_storage. Currently, I have a C++ class that looks like this
> that knows how to read/write data:
>
> class MyStorageClient {
>
> public:
>
> /// \brief Construct a client
>
> MyStorageClient(const std::string& service_location);
>
>
>
> /// \brief Read data from a table streamingly
>
> /// \param[in] table_uri
>
> /// \param[in] start_time The start time (inclusive), e.g.,
> '20100101'
>
> /// \param[in] end_time The end time (exclusive), e.g., '20100110'
>
> arrow::Result>
> ReadStream(const std::string& table_uri, const std::string& start_time,
> const std::string& end_time);
>
>
>
> /// \brief Write data to a table streamingly
>
> /// This method will return a FlightStreamWriter that can be used
> for streaming data into
>
> /// \param[in] table_uri
>
> /// \param[in] start_time The start time (inclusive), e.g.,
> '20100101'
>
> /// \param[in] end_time The end time (exclusive), e.g., '20100110'
>
> arrow::Result WriteStream(const std::string&
> table_uri, const std::shared_ptr &schema, const std::string
> &start_time, const std::string &end_time);
>
>
>
> /// \brief Get schema of a table.
>
> /// \param[in] table The Smooth table name, e.g.,
> smooth:/research/user/ljin/test
>
> arrow::Result> GetSchema(const
> std::string& table_uri);
> };
>
> I think Acero node's schema must be known when the node is created, I'd
> imagine I would implement MyStorageExecNode that gets created by
> SubstraitConsumer (via some registration mechanism in SubstraitConsumer):
>
> (1) GetSchema is called in SubstraitConsumer when creating the node
> (network call to the storage backend to get schema)
> (2) ReadStream is called in either ExecNode::Init or
> ExecNode::StartProducing
> to create the FlightStreamReader (3) Some thread (either the Plan's
> execution thread or the thread owned by MyStorageExecNode) will read from
> FlightStreamReader and send data downstream.
>
> Does that sound like the right approach or is there some other way I should
> do this?
>
> On Wed, Aug 31, 2022 at 6:16 PM David Li  wrote:
>
> > Hi Li,
> >
> > It'd depend on how exactly you expect everything to fit together, and I
> > think the way you'd go about it would depend on what exactly the
> > application is. For instance, you could have the application code do
> > everything up through DoGet and get a reader, then create a SourceNode
> from
> > the reader and continue from there.
> >
> > Otherwise, I wou

Re: Question on handling API changes when upgrading Pyarrow

2022-09-10 Thread Li Jin
Thanks Weston. I don’t own the code that broke so it’s possible that it was
ignored.

Thanks for the links!

On Fri, Sep 9, 2022 at 8:44 PM Weston Pace  wrote:

> Breaking changes should be documented in the release notes which are
> announced on the Arrow blog[1][2].  In addition, in pyarrow, changes
> to non-experimental APIs (and often also those made to experimental
> APIs) should go through a deprecation cycle where a warning is emitted
> for at least one release.
>
> It appears the `chunksize` vs. `max_chunksize` change should have
> emitted a warning for the last 3 years[3].  Is it possible that you
> have warnings disabled?
>
> Also, it looks like there should have been a deprecation warning for
> read_schema as well[4].
>
> [1] https://arrow.apache.org/blog/2022/05/15/8.0.0-release/
> [2] https://arrow.apache.org/blog/2022/08/16/9.0.0-release/
> [3]
> https://github.com/apache/arrow/blame/3eb5673597bf67246271b6c9a98e6f812d4e01a7/python/pyarrow/table.pxi#L1991
> [4]
> https://github.com/apache/arrow/blob/apache-arrow-7.0.0/python/pyarrow/__init__.py#L368
>
> On Fri, Sep 9, 2022 at 10:15 AM Li Jin  wrote:
> >
> > After digging the code a bit, it looks like:
> > (1) pyarrow.read_schema should be changed to pyarrow.ipc.read_schema
> > (2) chunksize should be changed to max_chunksize (it was passed in as a
> > generic kwargs before and I am guessing it was a wrong in the first
> place)
> >
> > These seem to be easy enough to fix but just wondering in general where
> do
> > I look first if I hit this sort of issue in the future.
> >
> > On Fri, Sep 9, 2022 at 12:20 PM Li Jin  wrote:
> >
> > > Hi,
> > >
> > > I am trying to update Pyarrow from 7.0 to 9.0 and hit a couple of
> issues
> > > that I believe are because of some API changes. In particular, two
> issues I
> > > saw seems to be
> > >
> > > (1) pyarrow.read_schema is removed
> > > (2) pa.Table.to_batches no longer takes a keyword argument (chunksize)
> > >
> > > What's the best way to find API change notes and fix it?
> > >
> > > Thanks,
> > > Li
> > >
>


Re: Question on handling API changes when upgrading Pyarrow

2022-09-09 Thread Li Jin
After digging the code a bit, it looks like:
(1) pyarrow.read_schema should be changed to pyarrow.ipc.read_schema
(2) chunksize should be changed to max_chunksize (it was passed in as a
generic kwargs before and I am guessing it was a wrong in the first place)

These seem to be easy enough to fix but just wondering in general where do
I look first if I hit this sort of issue in the future.

On Fri, Sep 9, 2022 at 12:20 PM Li Jin  wrote:

> Hi,
>
> I am trying to update Pyarrow from 7.0 to 9.0 and hit a couple of issues
> that I believe are because of some API changes. In particular, two issues I
> saw seems to be
>
> (1) pyarrow.read_schema is removed
> (2) pa.Table.to_batches no longer takes a keyword argument (chunksize)
>
> What's the best way to find API change notes and fix it?
>
> Thanks,
> Li
>


Question on handling API changes when upgrading Pyarrow

2022-09-09 Thread Li Jin
Hi,

I am trying to update Pyarrow from 7.0 to 9.0 and hit a couple of issues
that I believe are because of some API changes. In particular, two issues I
saw seems to be

(1) pyarrow.read_schema is removed
(2) pa.Table.to_batches no longer takes a keyword argument (chunksize)

What's the best way to find API change notes and fix it?

Thanks,
Li


Re: Integration between Flight and Acero

2022-09-01 Thread Li Jin
Thanks David. I think my original question might not have been accurate so
I will try to rephrase my question:

My ultimate goal is to add an ibis source node:

class MyStorageTable(ibis.TableNode, sch.HasSchema):
url = ... # e.g. "my_storage://my_path"
begin = ... # e.g. "20220101"
end = ... # e.g. "20220201"

and pass it to Acero and have Acero create a source node that knows how to
read from my_storage. Currently, I have a C++ class that looks like this
that knows how to read/write data:

class MyStorageClient {

public:

/// \brief Construct a client

MyStorageClient(const std::string& service_location);



/// \brief Read data from a table streamingly

/// \param[in] table_uri

/// \param[in] start_time The start time (inclusive), e.g.,
'20100101'

/// \param[in] end_time The end time (exclusive), e.g., '20100110'

arrow::Result>
ReadStream(const std::string& table_uri, const std::string& start_time,
const std::string& end_time);



/// \brief Write data to a table streamingly

/// This method will return a FlightStreamWriter that can be used
for streaming data into

/// \param[in] table_uri

/// \param[in] start_time The start time (inclusive), e.g.,
'20100101'

/// \param[in] end_time The end time (exclusive), e.g., '20100110'

arrow::Result WriteStream(const std::string&
table_uri, const std::shared_ptr &schema, const std::string
&start_time, const std::string &end_time);



/// \brief Get schema of a table.

/// \param[in] table The Smooth table name, e.g.,
smooth:/research/user/ljin/test

arrow::Result> GetSchema(const
std::string& table_uri);
};

I think Acero node's schema must be known when the node is created, I'd
imagine I would implement MyStorageExecNode that gets created by
SubstraitConsumer (via some registration mechanism in SubstraitConsumer):

(1) GetSchema is called in SubstraitConsumer when creating the node
(network call to the storage backend to get schema)
(2) ReadStream is called in either ExecNode::Init or ExecNode::StartProducing
to create the FlightStreamReader (3) Some thread (either the Plan's
execution thread or the thread owned by MyStorageExecNode) will read from
FlightStreamReader and send data downstream.

Does that sound like the right approach or is there some other way I should
do this?

On Wed, Aug 31, 2022 at 6:16 PM David Li  wrote:

> Hi Li,
>
> It'd depend on how exactly you expect everything to fit together, and I
> think the way you'd go about it would depend on what exactly the
> application is. For instance, you could have the application code do
> everything up through DoGet and get a reader, then create a SourceNode from
> the reader and continue from there.
>
> Otherwise, I would think the way to go would be to be able to create a
> node from a FlightDescriptor (which would contain the URL/parameters in
> your example). In that case, I think it'd fit into Arrow Dataset, under
> ARROW-10524 [1]. In that case, I'd equate GetFlightInfo to dataset
> discovery, and each FlightEndpoint in the FlightInfo to a Fragment. As a
> bonus, there's already good integration between Dataset and Acero and this
> should naturally do things like read the FlightEndpoints in parallel with
> readahead and so on.
>
> That means: you'd start with the FlightDescriptor, and create a Dataset
> from it. This will call GetFlightInfo under the hood. (There's a minor
> catch here: this assumes the service that returns the FlightInfo can embed
> an accurate schema into it. If that's not true, there'll have to be some
> finagling with various ways of getting the actual schema, depending on what
> exactly your service supports.) Once you have a Dataset, you can create an
> ExecPlan and proceed like normal.
>
> Of course, if you then want to get things into Python, R, Substrait,
> etc... that requires some more work - especially for Substrait where I'm
> not sure how best to encode a custom source like that.
>
> [1]: https://issues.apache.org/jira/browse/ARROW-10524
>
> -David
>
> On Wed, Aug 31, 2022, at 17:09, Li Jin wrote:
> > Hello!
> >
> > I have recently started to look into integrating Flight RPC with Acero
> > source/sink node.
> >
> > In Flight, the life cycle of a "read" request looks sth like:
> >
> >- User specifies a URL (e.g. my_storage://my_path) and parameter
> (e.g.,
> >begin = "20220101", end = "20220201")
> >- Client issue GetFlightInfo and get FlightInfo from server
> >- Client issue DoGet with the FlightInfo and get a stream reader
> >- Client calls Nextuntil stream is exhausted
> >
> > My question is, how does the above life cycle fit in an Acero node? In
> > other words, what are the proper places in Acero node lifecycle to issue
> > the corresponding flight RPC?
> >
> > Appreciate any thoughts,
> > Li
>


Integration between Flight and Acero

2022-08-31 Thread Li Jin
Hello!

I have recently started to look into integrating Flight RPC with Acero
source/sink node.

In Flight, the life cycle of a "read" request looks sth like:

   - User specifies a URL (e.g. my_storage://my_path) and parameter (e.g.,
   begin = "20220101", end = "20220201")
   - Client issue GetFlightInfo and get FlightInfo from server
   - Client issue DoGet with the FlightInfo and get a stream reader
   - Client calls Nextuntil stream is exhausted

My question is, how does the above life cycle fit in an Acero node? In
other words, what are the proper places in Acero node lifecycle to issue
the corresponding flight RPC?

Appreciate any thoughts,
Li


Re: [C++] Read Flight data source into Acero

2022-08-18 Thread Li Jin
Thanks all. I will try this out.

On Thu, Aug 18, 2022 at 9:06 AM Rok Mihevc  wrote:

> +1 for adding this either a utility function or cookbook recipe [1].
>
> [1] https://github.com/apache/arrow-cookbook
>
> On Thu, Aug 18, 2022 at 2:34 PM Yaron Gvili  wrote:
>
> > I have code in source_node.cc in a local branch adding factories for
> other
> > sources in SourceNode (e.g., streams of RecordBatch, ExecBatch, or
> > ArrayVector) which I could make a PR for, if there is interest.
> >
> >
> > Yaron.
> > 
> > From: David Li 
> > Sent: Wednesday, August 17, 2022 4:25 PM
> > To: dev@arrow.apache.org 
> > Subject: Re: [C++] Read Flight data source into Acero
> >
> > Convert the reader to a RecordBatchReader [1], turn it into an
> > AsyncGenerator [2], then wrap it in a SourceNode (though I think
> SourceNode
> > should just expose a helper for wrapping a RecordBatchReader like it
> > already does for Tables).
> >
> > [1]:
> >
> https://github.com/apache/arrow/blob/8474ee5a3ed725d4bb56c75fc1b13a53cba1fd1f/cpp/src/arrow/flight/types.h#L561
> > [2]:
> >
> https://github.com/apache/arrow/blob/ee2e9448c8565820ba38a2df9e44ab6055e5df1d/cpp/src/arrow/compute/exec/exec_plan.h#L529-L535
> >
> > On Wed, Aug 17, 2022, at 15:32, Aldrin wrote:
> > > I don't have any pointers, but just wanted to mention that I am going
> to
> > > try and figure this out quite a bit in the next week. I can try to
> create
> > > some relevant cookbook recipes as I plod along.
> > >
> > > Aldrin Montana
> > > Computer Science PhD Student
> > > UC Santa Cruz
> > >
> > >
> > > On Wed, Aug 17, 2022 at 9:15 AM Li Jin  wrote:
> > >
> > >> Correction: I have a flight::FlightStreamReader (not
> > Flight::StreamReader)
> > >>
> > >> On Wed, Aug 17, 2022 at 12:12 PM Li Jin 
> wrote:
> > >>
> > >> > Hi,
> > >> >
> > >> > I have a Flight data source (effectively a Flight::StreamReader) and
> > I'd
> > >> > like to create an Acero source node from it. I wonder if something
> > >> already
> > >> > exists to do that or if not, perhaps some pointers for me  to take a
> > look
> > >> > at?
> > >> >
> > >> > Thanks,
> > >> > Li
> > >> >
> > >> >
> > >>
> >
>


Re: [C++] Read Flight data source into Acero

2022-08-17 Thread Li Jin
Correction: I have a flight::FlightStreamReader (not Flight::StreamReader)

On Wed, Aug 17, 2022 at 12:12 PM Li Jin  wrote:

> Hi,
>
> I have a Flight data source (effectively a Flight::StreamReader) and I'd
> like to create an Acero source node from it. I wonder if something already
> exists to do that or if not, perhaps some pointers for me  to take a look
> at?
>
> Thanks,
> Li
>
>


[C++] Read Flight data source into Acero

2022-08-17 Thread Li Jin
Hi,

I have a Flight data source (effectively a Flight::StreamReader) and I'd
like to create an Acero source node from it. I wonder if something already
exists to do that or if not, perhaps some pointers for me  to take a look
at?

Thanks,
Li


Re: dealing with tester timeout in a CI job

2022-08-17 Thread Li Jin
Yaron, how does the asof join tests normally take?

On Wed, Aug 17, 2022 at 6:13 AM Yaron Gvili  wrote:

> Sorry, yes, C++. The failed job is
> https://github.com/apache/arrow/runs/7839062613?check_suite_focus=true
> and it timed out on code I wrote (in a PR, not merged). I'd like to avoid a
> timeout without reengineering or reducing the set of tests I wrote, hence
> my questions.
>
>
> Yaron.
> 
> From: Sutou Kouhei 
> Sent: Tuesday, August 16, 2022 8:13 PM
> To: dev@arrow.apache.org 
> Subject: Re: dealing with tester timeout in a CI job
>
> Hi,
>
> What language are you talking about? C++?
> For C++, we have two timeouts:
> * GitHub Action's timeout
> * GTest's timeout
>
> Could you show the URL of the failed macOS related CI job?
>
> Thanks,
> --
> kou
>
> In
>  <
> paxp190mb1565310e470e696da667f540bd...@paxp190mb1565.eurp190.prod.outlook.com
> >
>   "dealing with tester timeout in a CI job" on Tue, 16 Aug 2022 16:34:24
> +,
>   Yaron Gvili  wrote:
>
> > Hi,
> >
> > What are some acceptable ways to handle a timeout failure in a CI job
> for a tester I implemented? For reference, I got such a timeout for only
> one MacOS related CI job, while the other CI jobs did not get such a
> timeout.
> >
> > Let's assume that I cannot (easily) make the tests run any faster. Is it
> possible/acceptable to change the timeout, and how? to turn off some of the
> tests for one or all CI jobs, and how? to split the tester into several, so
> that each meets the timeout allotment?
> >
> >
> > Cheers,
> > Yaron.
>


  1   2   3   4   >