Talking with Stefano this morning and looking at the DataSourceTask code we
discovered that the open() and close() methods are both called for every
split and not once per inputFormat instance (maybe open and close should be
renamed as openSplit and closeSplit to avoid confusion...).
I think that it could worth to add 2 methods to the InputFormat (e.g.
openInputFormat() and closeInputFormat() ) to allow for the managment of
the InputFormat lifecycle, otherwise I'll need to instantiate a pool (and
thus adding a dependency) to avoid the creation of a new connection
(expensive operation) for every split (that in our use case happens
millions of times).

What about the output of the inputFormat? how do you want me to proceed?
With POJO or Row? If POJO, which strategy do you suggest?

Best,
Flavio

On Fri, Apr 15, 2016 at 2:06 PM, Stefano Bortoli <s.bort...@gmail.com>
wrote:

> If we share the connection, then we should also be careful with the close()
> implementation. I did not see changes for this method in the PR.
>
> saluti,
> Stefano
>
> 2016-04-15 11:01 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>:
>
> > Following your suggestions I've fixed the connection reuse in my PR at
> > https://github.com/apache/flink/pull/1885.
> > I simply check in the establishConnection() if dbConn!=null and, in that
> > case, I simply return immediately.
> >
> > Thus, the only remaining thin to fix is the null handling. Do you have
> any
> > suggestion about how to transform the results in a POJO?
> > Maybe returning a Row and then let the user manage the conversion to the
> > target POJO in a successive map could be a more general soloution?
> >
> > Best,
> > Flavio
> >
> > On Thu, Apr 14, 2016 at 6:52 PM, Fabian Hueske <fhue...@gmail.com>
> wrote:
> >
> > > There is an InputFormat object for each parallel task of a DataSource.
> > > So for a source with parallelism 8 you will have 8 instances of the
> > > InputFormat running, regardless whether this is on one box with 8 slots
> > or
> > > 8 machines with 1 slots each.
> > > The same is true for all other operators (Map, Reduce, Join, etc.) and
> > > DataSinks.
> > >
> > > Note, a single task does not fill a slot, but a "slice" of the program
> > (one
> > > parallel task of each operator) fills a slot.
> > >
> > > Cheers, Fabian
> > >
> > > 2016-04-14 18:47 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>:
> > >
> > > > ok thanks!just one last question: an inputformat is instantiated for
> > each
> > > > task slot or once for task manger?
> > > > On 14 Apr 2016 18:07, "Chesnay Schepler" <ches...@apache.org> wrote:
> > > >
> > > > > no.
> > > > >
> > > > > if (connection==null) {
> > > > >  establishCOnnection();
> > > > > }
> > > > >
> > > > > done. same connection for all splits.
> > > > >
> > > > > On 14.04.2016 17:59, Flavio Pompermaier wrote:
> > > > >
> > > > >> I didn't understand what you mean for "it should also be possible
> to
> > > > reuse
> > > > >> the same connection of an InputFormat across InputSplits, i.e.,
> > calls
> > > of
> > > > >> the open() method".
> > > > >> At the moment in the open method there's a call to
> > > establishConnection,
> > > > >> thus, a new connection is created for each split.
> > > > >> If I understood correctly, you're suggesting to create a pool in
> the
> > > > >> inputFormat and simply call poo.borrow() in the open() rather than
> > > > >> establishConnection?
> > > > >>
> > > > >> On 14 Apr 2016 17:28, "Chesnay Schepler" <ches...@apache.org>
> > wrote:
> > > > >>
> > > > >> On 14.04.2016 17:22, Fabian Hueske wrote:
> > > > >>>
> > > > >>> Hi Flavio,
> > > > >>>>
> > > > >>>> that are good questions.
> > > > >>>>
> > > > >>>> 1) Replacing null values by default values and simply forwarding
> > > > records
> > > > >>>> is
> > > > >>>> very dangerous, in my opinion.
> > > > >>>> I see two alternatives: A) we use a data type that tolerates
> null
> > > > >>>> values.
> > > > >>>> This could be a POJO that the user has to provide or Row. The
> > > drawback
> > > > >>>> of
> > > > >>>> Row is that it is untyped and not easy to handle. B) We use
> Tuple
> > > and
> > > > >>>> add
> > > > >>>> an additional field that holds an Integer which serves as a
> bitset
> > > to
> > > > >>>> mark
> > > > >>>> null fields. This would be a pretty low level API though. I am
> > > leaning
> > > > >>>> towards the user-provided POJO option.
> > > > >>>>
> > > > >>>> i would also lean towards the POJO option.
> > > > >>>
> > > > >>> 2) The JDBCInputFormat is located in a dedicated Maven module. I
> > > think
> > > > we
> > > > >>>> can add a dependency to that module. However, it should also be
> > > > possible
> > > > >>>> to
> > > > >>>> reuse the same connection of an InputFormat across InputSplits,
> > > i.e.,
> > > > >>>> calls
> > > > >>>> of the open() method. Wouldn't that be sufficient?
> > > > >>>>
> > > > >>>> this is the right approach imo.
> > > > >>>
> > > > >>> Best, Fabian
> > > > >>>>
> > > > >>>> 2016-04-14 16:59 GMT+02:00 Flavio Pompermaier <
> > pomperma...@okkam.it
> > > >:
> > > > >>>>
> > > > >>>> Hi guys,
> > > > >>>>
> > > > >>>>> I'm integrating the comments of Chesnay to my PR but there's a
> > > couple
> > > > >>>>> of
> > > > >>>>> thing that I'd like to discuss with the core developers.
> > > > >>>>>
> > > > >>>>>
> > > > >>>>>      1. about the JDBC type mapping (addValue() method at [1]:
> At
> > > the
> > > > >>>>> moment
> > > > >>>>>      if I find a null value for a  Double, the getDouble of
> jdbc
> > > > return
> > > > >>>>> 0.0.
> > > > >>>>> Is
> > > > >>>>>      it really the correct behaviour? Wouldn't be better to
> use a
> > > > POJO
> > > > >>>>> or
> > > > >>>>> the
> > > > >>>>>      Row of datatable that can handle void? Moreover, the
> mapping
> > > > >>>>> between
> > > > >>>>> SQL
> > > > >>>>>      type and Java types varies much from the single JDBC
> > > > >>>>> implementation.
> > > > >>>>>      Wouldn't be better to rely on the Java type coming from
> > using
> > > > >>>>>      resultSet.getObject() to get such a mapping rather than
> > using
> > > > the
> > > > >>>>>      ResultSetMetadata types?
> > > > >>>>>      2. I'd like to handle connections very efficiently because
> > we
> > > > >>>>> have a
> > > > >>>>> use
> > > > >>>>>      case with billions of records and thus millions of splits
> > and
> > > > >>>>> establish
> > > > >>>>> a
> > > > >>>>>      new connection each time could be expensive. Would it be a
> > > > >>>>> problem to
> > > > >>>>> add
> > > > >>>>>      apache pool dependency to the jdbc batch connector in
> order
> > to
> > > > >>>>> reuase
> > > > >>>>> the
> > > > >>>>>      created connections?
> > > > >>>>>
> > > > >>>>>
> > > > >>>>> [1]
> > > > >>>>>
> > > > >>>>>
> > > > >>>>>
> > > > >>>>>
> > > >
> > >
> >
> https://github.com/fpompermaier/flink/blob/FLINK-3750/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
> > > > >>>>>
> > > > >>>>>
> > > > >>>>>
> > > > >
> > > >
> > >
> >
>

Reply via email to