If we share the connection, then we should also be careful with the close() implementation. I did not see changes for this method in the PR.
saluti, Stefano 2016-04-15 11:01 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: > Following your suggestions I've fixed the connection reuse in my PR at > https://github.com/apache/flink/pull/1885. > I simply check in the establishConnection() if dbConn!=null and, in that > case, I simply return immediately. > > Thus, the only remaining thin to fix is the null handling. Do you have any > suggestion about how to transform the results in a POJO? > Maybe returning a Row and then let the user manage the conversion to the > target POJO in a successive map could be a more general soloution? > > Best, > Flavio > > On Thu, Apr 14, 2016 at 6:52 PM, Fabian Hueske <fhue...@gmail.com> wrote: > > > There is an InputFormat object for each parallel task of a DataSource. > > So for a source with parallelism 8 you will have 8 instances of the > > InputFormat running, regardless whether this is on one box with 8 slots > or > > 8 machines with 1 slots each. > > The same is true for all other operators (Map, Reduce, Join, etc.) and > > DataSinks. > > > > Note, a single task does not fill a slot, but a "slice" of the program > (one > > parallel task of each operator) fills a slot. > > > > Cheers, Fabian > > > > 2016-04-14 18:47 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: > > > > > ok thanks!just one last question: an inputformat is instantiated for > each > > > task slot or once for task manger? > > > On 14 Apr 2016 18:07, "Chesnay Schepler" <ches...@apache.org> wrote: > > > > > > > no. > > > > > > > > if (connection==null) { > > > > establishCOnnection(); > > > > } > > > > > > > > done. same connection for all splits. > > > > > > > > On 14.04.2016 17:59, Flavio Pompermaier wrote: > > > > > > > >> I didn't understand what you mean for "it should also be possible to > > > reuse > > > >> the same connection of an InputFormat across InputSplits, i.e., > calls > > of > > > >> the open() method". > > > >> At the moment in the open method there's a call to > > establishConnection, > > > >> thus, a new connection is created for each split. > > > >> If I understood correctly, you're suggesting to create a pool in the > > > >> inputFormat and simply call poo.borrow() in the open() rather than > > > >> establishConnection? > > > >> > > > >> On 14 Apr 2016 17:28, "Chesnay Schepler" <ches...@apache.org> > wrote: > > > >> > > > >> On 14.04.2016 17:22, Fabian Hueske wrote: > > > >>> > > > >>> Hi Flavio, > > > >>>> > > > >>>> that are good questions. > > > >>>> > > > >>>> 1) Replacing null values by default values and simply forwarding > > > records > > > >>>> is > > > >>>> very dangerous, in my opinion. > > > >>>> I see two alternatives: A) we use a data type that tolerates null > > > >>>> values. > > > >>>> This could be a POJO that the user has to provide or Row. The > > drawback > > > >>>> of > > > >>>> Row is that it is untyped and not easy to handle. B) We use Tuple > > and > > > >>>> add > > > >>>> an additional field that holds an Integer which serves as a bitset > > to > > > >>>> mark > > > >>>> null fields. This would be a pretty low level API though. I am > > leaning > > > >>>> towards the user-provided POJO option. > > > >>>> > > > >>>> i would also lean towards the POJO option. > > > >>> > > > >>> 2) The JDBCInputFormat is located in a dedicated Maven module. I > > think > > > we > > > >>>> can add a dependency to that module. However, it should also be > > > possible > > > >>>> to > > > >>>> reuse the same connection of an InputFormat across InputSplits, > > i.e., > > > >>>> calls > > > >>>> of the open() method. Wouldn't that be sufficient? > > > >>>> > > > >>>> this is the right approach imo. > > > >>> > > > >>> Best, Fabian > > > >>>> > > > >>>> 2016-04-14 16:59 GMT+02:00 Flavio Pompermaier < > pomperma...@okkam.it > > >: > > > >>>> > > > >>>> Hi guys, > > > >>>> > > > >>>>> I'm integrating the comments of Chesnay to my PR but there's a > > couple > > > >>>>> of > > > >>>>> thing that I'd like to discuss with the core developers. > > > >>>>> > > > >>>>> > > > >>>>> 1. about the JDBC type mapping (addValue() method at [1]: At > > the > > > >>>>> moment > > > >>>>> if I find a null value for a Double, the getDouble of jdbc > > > return > > > >>>>> 0.0. > > > >>>>> Is > > > >>>>> it really the correct behaviour? Wouldn't be better to use a > > > POJO > > > >>>>> or > > > >>>>> the > > > >>>>> Row of datatable that can handle void? Moreover, the mapping > > > >>>>> between > > > >>>>> SQL > > > >>>>> type and Java types varies much from the single JDBC > > > >>>>> implementation. > > > >>>>> Wouldn't be better to rely on the Java type coming from > using > > > >>>>> resultSet.getObject() to get such a mapping rather than > using > > > the > > > >>>>> ResultSetMetadata types? > > > >>>>> 2. I'd like to handle connections very efficiently because > we > > > >>>>> have a > > > >>>>> use > > > >>>>> case with billions of records and thus millions of splits > and > > > >>>>> establish > > > >>>>> a > > > >>>>> new connection each time could be expensive. Would it be a > > > >>>>> problem to > > > >>>>> add > > > >>>>> apache pool dependency to the jdbc batch connector in order > to > > > >>>>> reuase > > > >>>>> the > > > >>>>> created connections? > > > >>>>> > > > >>>>> > > > >>>>> [1] > > > >>>>> > > > >>>>> > > > >>>>> > > > >>>>> > > > > > > https://github.com/fpompermaier/flink/blob/FLINK-3750/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java > > > >>>>> > > > >>>>> > > > >>>>> > > > > > > > > > >