[jira] [Commented] (BEAM-12044) JdbcIO should explicitly setAutoCommit to false

2021-03-25 Thread Eugene Kirpichov (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-12044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17308870#comment-17308870
 ] 

Eugene Kirpichov commented on BEAM-12044:
-

[~aromanenko] my take: it is possible, but I think Sylvain's PR improves the 
default behavior for all users, so we should do it by default. Because this is 
in JdbcIO.read(), there are no commits happening anyway, so this should be 
purely a performance improvement with no semantic changes.

Actually there's one possible caveat that I'll mention on the PR in a moment.

> JdbcIO should explicitly setAutoCommit to false
> ---
>
> Key: BEAM-12044
> URL: https://issues.apache.org/jira/browse/BEAM-12044
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Affects Versions: 2.28.0
>Reporter: Sylvain Veyrié
>Priority: P2
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> Hello,
> Per [PostgreSQL JDBC 
> documentation|https://jdbc.postgresql.org/documentation/head/query.html#query-with-cursor],
>  autocommit must be explicitly disabled on the connection to allow cursor 
> streaming.
> [~jkff] mentionned it [on the mailing 
> list|https://www.mail-archive.com/dev@beam.apache.org/msg16808.html], however 
> even if there is:
> {code:java}
> poolableConnectionFactory.setDefaultAutoCommit(false);
> {code}
> in JdbcIO:1555, currently, at least with JDBC driver 42.2.16, any read with 
> JdbcIO will memoize the whole dataset (which leads to OOM), since 
> {code:java}
> connection.getAutoCommit()
> {code}
> returns true in JdbcIO#ReadFn#processElement.
> I can provide a PR — the patch is pretty simple (and solves the problem for 
> us in 2.28.0):
> {code:java}
> if (connection == null) {
> connection = dataSource.getConnection();
> }
> connection.setAutoCommit(false); // line added
> {code}
> Thanks!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-12044) JdbcIO should explicitly setAutoCommit to false

2021-03-24 Thread Eugene Kirpichov (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-12044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17308322#comment-17308322
 ] 

Eugene Kirpichov commented on BEAM-12044:
-

When glancing over the code it seems like the setDefaultAutoCommit line in beam 
JdbcIO actually is not executed - it's used only when you use 
PoolableDataSourceProvider, it's not used by default, and the scio JdbcIO isn't 
using it either.

I think your proposed patch makes sense, please send a PR, thanks! Just please 
add a comment explaining why the connection must not be in autocommit mode. A 
link to the Postgres documentation might be good enough, as long as there are 
no other database engines requiring the connection to be *in* autocommit mode 
to allow streaming, which seems unlikely.

> JdbcIO should explicitly setAutoCommit to false
> ---
>
> Key: BEAM-12044
> URL: https://issues.apache.org/jira/browse/BEAM-12044
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Affects Versions: 2.28.0
>Reporter: Sylvain Veyrié
>Priority: P2
>
> Hello,
> Per [PostgreSQL JDBC 
> documentation|https://jdbc.postgresql.org/documentation/head/query.html#query-with-cursor],
>  autocommit must be explicitly disabled on the connection to allow cursor 
> streaming.
> [~jkff] mentionned it [on the mailing 
> list|https://www.mail-archive.com/dev@beam.apache.org/msg16808.html], however 
> even if there is:
> {code:java}
> poolableConnectionFactory.setDefaultAutoCommit(false);
> {code}
> in JdbcIO:1555, currently, at least with JDBC driver 42.2.16, any read with 
> JdbcIO will memoize the whole dataset (which leads to OOM), since 
> {code:java}
> connection.getAutoCommit()
> {code}
> returns true in JdbcIO#ReadFn#processElement.
> I can provide a PR — the patch is pretty simple (and solves the problem for 
> us in 2.28.0):
> {code:java}
> if (connection == null) {
> connection = dataSource.getConnection();
> }
> connection.setAutoCommit(false); // line added
> {code}
> Thanks!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-12044) JdbcIO should explicitly setAutoCommit to false

2021-03-24 Thread Eugene Kirpichov (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-12044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17307957#comment-17307957
 ] 

Eugene Kirpichov commented on BEAM-12044:
-

[~sveyrie] Thanks! I no longer work on Beam, but am curious about this issue. 
Do I understand correctly that the Postgres driver ignores 
`poolableConnectionFactory.setDefaultAutoCommit(false);` and you need to 
explicitly `connection.setAutoCommit(false)`? That sounds like a very unlikely 
bug in the driver, so I suspect that something else is going on - either a bug 
in JdbcIO, or a bug in your pipeline.

Could you give a code snippet of how you instantiate JdbcIO in your pipeline?

> JdbcIO should explicitly setAutoCommit to false
> ---
>
> Key: BEAM-12044
> URL: https://issues.apache.org/jira/browse/BEAM-12044
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Affects Versions: 2.28.0
>Reporter: Sylvain Veyrié
>Priority: P2
>
> Hello,
> Per [PostgreSQL JDBC 
> documentation|https://jdbc.postgresql.org/documentation/head/query.html#query-with-cursor],
>  autocommit must be explicitly disabled on the connection to allow cursor 
> streaming.
> [~jkff] mentionned it [on the mailing 
> list|https://www.mail-archive.com/dev@beam.apache.org/msg16808.html], however 
> even if there is:
> {code:java}
> poolableConnectionFactory.setDefaultAutoCommit(false);
> {code}
> in JdbcIO:1555, currently, at least with JDBC driver 42.2.16, any read with 
> JdbcIO will memoize the whole dataset (which leads to OOM), since 
> {code:java}
> connection.getAutoCommit()
> {code}
> returns true in JdbcIO#ReadFn#processElement.
> I can provide a PR — the patch is pretty simple (and solves the problem for 
> us in 2.28.0):
> {code:java}
> if (connection == null) {
> connection = dataSource.getConnection();
> }
> connection.setAutoCommit(false); // line added
> {code}
> Thanks!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-10702) Embedded job endpoint artifact service unzips PIP files, making them non-installable

2020-08-28 Thread Eugene Kirpichov (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-10702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eugene Kirpichov updated BEAM-10702:

Fix Version/s: 2.24.0

> Embedded job endpoint artifact service unzips PIP files, making them 
> non-installable
> 
>
> Key: BEAM-10702
> URL: https://issues.apache.org/jira/browse/BEAM-10702
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Affects Versions: 2.22.0
>Reporter: Eugene Kirpichov
>Assignee: Eugene Kirpichov
>Priority: P2
> Fix For: 2.24.0
>
>  Time Spent: 6.5h
>  Remaining Estimate: 0h
>
> For details see this thread: 
> https://lists.apache.org/thread.html/r36931794495dc2745c39e11410577931a0a8cfb07a48253eaeda3246%40%3Cuser.beam.apache.org%3E
> The thread also contains a solution. I'll send a PR - filing the JIRA just to 
> track.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-10702) Embedded job endpoint artifact service unzips PIP files, making them non-installable

2020-08-13 Thread Eugene Kirpichov (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-10702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eugene Kirpichov updated BEAM-10702:

Status: Resolved  (was: Open)

> Embedded job endpoint artifact service unzips PIP files, making them 
> non-installable
> 
>
> Key: BEAM-10702
> URL: https://issues.apache.org/jira/browse/BEAM-10702
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Affects Versions: 2.22.0
>Reporter: Eugene Kirpichov
>Assignee: Eugene Kirpichov
>Priority: P2
>  Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> For details see this thread: 
> https://lists.apache.org/thread.html/r36931794495dc2745c39e11410577931a0a8cfb07a48253eaeda3246%40%3Cuser.beam.apache.org%3E
> The thread also contains a solution. I'll send a PR - filing the JIRA just to 
> track.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-10702) Embedded job endpoint artifact service unzips PIP files, making them non-installable

2020-08-13 Thread Eugene Kirpichov (Jira)
Eugene Kirpichov created BEAM-10702:
---

 Summary: Embedded job endpoint artifact service unzips PIP files, 
making them non-installable
 Key: BEAM-10702
 URL: https://issues.apache.org/jira/browse/BEAM-10702
 Project: Beam
  Issue Type: Bug
  Components: sdk-py-core
Affects Versions: 2.22.0
Reporter: Eugene Kirpichov
Assignee: Eugene Kirpichov


For details see this thread: 
https://lists.apache.org/thread.html/r36931794495dc2745c39e11410577931a0a8cfb07a48253eaeda3246%40%3Cuser.beam.apache.org%3E

The thread also contains a solution. I'll send a PR - filing the JIRA just to 
track.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-9456) Upgrade to gradle 6.2

2020-08-11 Thread Eugene Kirpichov (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-9456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17175823#comment-17175823
 ] 

Eugene Kirpichov commented on BEAM-9456:


FWIW - this issue is further exacerbated by the fact that Gradle currently used 
in Beam is incompatible with the latest JDK, e.g. with the JDK used by default 
in Mac OS X Catalina: 
https://github.com/gradle/gradle/issues/10248#issuecomment-647923554

> Upgrade to gradle 6.2
> -
>
> Key: BEAM-9456
> URL: https://issues.apache.org/jira/browse/BEAM-9456
> Project: Beam
>  Issue Type: Task
>  Components: build-system
>Reporter: Alex Van Boxel
>Priority: P2
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-9316) FileIO.Write.relativeFileNaming should not be public

2020-02-14 Thread Eugene Kirpichov (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-9316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17037332#comment-17037332
 ] 

Eugene Kirpichov commented on BEAM-9316:


Could you elaborate why the current behavior is undesirable? Do I understand 
correctly that in the given example it'll write to 
gs://some/bucket/some/directory/ ? If so, that seems fairly intuitive to me and 
arguably a useful thing to have: writing to subdirectories within a base 
directory specified with .to().

I think making relativeFileNaming private would be unfortunate: it's a 
backwards incompatible change (can break people who use it), and it seems 
useful to have something of that sort.
As for checking whether a fileNaming is relative - I don't think it can be done 
in general (what if somebody writes a new FileNaming that, logically, is 
relative, but isn't constructed via relativeFileNaming). I guess you could 
throw runtime errors if .to() is specified and the FileNaming *returns* 
something that contains a slash? But again I'm not sure that's desirable.

I acknowledge that there is some confusion but I think the best way to address 
it would be to rename things more clearly (and deprecate the old names). E.g. 
maybe something like: to() -> toBaseDirectory() and relativeFileNaming() -> 
toSubDirectory()? There's probably better options too.

> FileIO.Write.relativeFileNaming should not be public
> 
>
> Key: BEAM-9316
> URL: https://issues.apache.org/jira/browse/BEAM-9316
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-files
>Reporter: Claire McGinty
>Priority: Major
>
> I think the existing FileIO.writeDynamic is a bit easy to misuse, as 
> something like this looks correct, and compiles:
>  
> {{ FileIO.writeDynamic()}}
> {{  .by(...)}}
> {{  .withNaming(new SerializableFunction[String, FileNaming] {}}
> {{     override def apply(str: String): FileNaming =}}
> {{       FileIO.Write.relativeFileNaming(}}
> {{         "some/directory",}}
> {{         new FileNaming {}}
> {{           override defFilename(window: BoundedWindow, pane: PaneInfo, 
> numShards: Int, shardIndex: Int, compression: Compression): String = 
> "some_filename.txt"}}{{}}}
> {{  .via(...)}}
> {{  .to("gs://some/bucket")}}
>  
> However, for dynamic writes, if `outputDirectory` (.to("...")) is set, under 
> the hood, Beam will wrap the provided `fileNamingFn` in 
> `FileIO.Write.relativeFileNaming(...)` as well, so it ends up as a nested 
> `relativeFileNaming` function. 
> ([https://github.com/apache/beam/blob/da9e17288e8473925674a4691d9e86252e67d7d7/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L1243)|https://github.com/apache/beam/blob/da9e17288e8473925674a4691d9e86252e67d7d7/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L1243])]
>  
> IMO, `relativeFileNaming` should either be made private, so that it's only 
> used internally by FileIO.Write, or a precondition should be added when a 
> dynamic FileIO.Write is expanded, to check that `outputDirectory` can't be 
> set if the provided `fileNamingFn` is relative.
>  
> wdyt?
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-8942) Improve support for lambdas that throw checked exceptions in various transforms

2019-12-10 Thread Eugene Kirpichov (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-8942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16992989#comment-16992989
 ] 

Eugene Kirpichov commented on BEAM-8942:


Beam Java has a few interfaces for functions - SerializableFunction (doesn't 
throw, takes lambdas), ProcessFunction (throws, takes lambdas), 
InferableFunction (throws, doesn't take lambdas).
Just having 3 interfaces is already confusing - it seems like 
SerializableFunction should be deleted or deprecated.

However, we should make it so that various PTransforms taking lambdas, e.g. 
Map/FlatMapElements.via(), FileIO.write().by() etc. can all take throwing 
lambdas. That might be by removing the SerializableFunction overloads or by 
adding new methods or any other means.

> Improve support for lambdas that throw checked exceptions in various 
> transforms
> ---
>
> Key: BEAM-8942
> URL: https://issues.apache.org/jira/browse/BEAM-8942
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Tomo Suzuki
>Priority: Major
>
> Ticket to followup the below:
> [~jkff] says:
> {quote}Would be nice (not in this PR) to make it easier to do 
> MapElements.via() with throwing functions without the hassle of fn(lambda, 
> Requirements.empty()).
> {quote}
>  
> [https://github.com/apache/beam/pull/10256/files#r356282252]
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-8942) Improve support for lambdas that throw checked exceptions in various transforms

2019-12-10 Thread Eugene Kirpichov (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eugene Kirpichov updated BEAM-8942:
---
Summary: Improve support for lambdas that throw checked exceptions in 
various transforms  (was: MapElements.via to work with lambda that has 
IOException )

> Improve support for lambdas that throw checked exceptions in various 
> transforms
> ---
>
> Key: BEAM-8942
> URL: https://issues.apache.org/jira/browse/BEAM-8942
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Tomo Suzuki
>Priority: Major
>
> Ticket to followup the below:
> [~jkff] says:
> {quote}Would be nice (not in this PR) to make it easier to do 
> MapElements.via() with throwing functions without the hassle of fn(lambda, 
> Requirements.empty()).
> {quote}
>  
> [https://github.com/apache/beam/pull/10256/files#r356282252]
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-8561) Add ThriftIO to Support IO for Thrift Files

2019-11-06 Thread Eugene Kirpichov (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-8561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16968834#comment-16968834
 ] 

Eugene Kirpichov commented on BEAM-8561:


Please above all make sure that this connector provides APIs compatible with 
FileIO: i.e. ThriftIO.readFiles() and ThritfIO.sink().
Providing ThriftIO.read() and write() for basic use cases also makes sense, but 
there's no need to make them super customizable, or to provide readAll() - all 
advanced use cases can be handled by the two methods above + FileIO.

> Add ThriftIO to Support IO for Thrift Files
> ---
>
> Key: BEAM-8561
> URL: https://issues.apache.org/jira/browse/BEAM-8561
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-files
>Reporter: Chris Larsen
>Assignee: Chris Larsen
>Priority: Minor
>
> Similar to AvroIO it would be very useful to support reading and writing 
> to/from Thrift files with a native connector. 
> Functionality would include:
>  # read() - Reading from one or more Thrift files.
>  # write() - Writing to one or more Thrift files.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-2680) Improve scalability of the Watch transform

2019-08-05 Thread Eugene Kirpichov (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-2680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16900155#comment-16900155
 ] 

Eugene Kirpichov commented on BEAM-2680:


Ah, it's a very important clarification that the source files actually no 
longer exist. We probably can't always immediately do that, to accommodate for 
eventually-consistent filesystems that might have the file flicker between 
polling rounds for a while, but after a certain timeout we could do that. We'd 
probably need to switch back to storing the actual keys instead of hashes 
though.

> Improve scalability of the Watch transform
> --
>
> Key: BEAM-2680
> URL: https://issues.apache.org/jira/browse/BEAM-2680
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Reporter: Eugene Kirpichov
>Assignee: Eugene Kirpichov
>Priority: Major
>
> [https://github.com/apache/beam/pull/3565] introduces the Watch transform 
> [http://s.apache.org/beam-watch-transform].
> The implementation leaves several scalability-related TODOs:
>  1) The state stores hashes and timestamps of outputs that have already been 
> output and should be omitted from future polls. We could garbage-collect this 
> state, e.g. dropping elements from "completed" and from addNewAsPending() if 
> their timestamp is more than X behind the watermark.
>  2) When a poll returns a huge number of elements, we don't necessarily have 
> to add all of them into state.pending - instead we could add only N oldest 
> elements and ignore others, relying on future poll rounds to provide them, in 
> order to avoid blowing up the state. Combined with garbage collection of 
> GrowthState.completed, this would make the transform scalable to very large 
> poll results.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (BEAM-2680) Improve scalability of the Watch transform

2019-08-05 Thread Eugene Kirpichov (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-2680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16900047#comment-16900047
 ] 

Eugene Kirpichov commented on BEAM-2680:


Thanks for the heads up, Steve. Does the sharding workaround make sense for 
your use case? If not, then I guess it's time to implement garbage collection 
of completed items. Which still would be tricky, so I'd recommend discussing a 
design on dev@ first.

> Improve scalability of the Watch transform
> --
>
> Key: BEAM-2680
> URL: https://issues.apache.org/jira/browse/BEAM-2680
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Reporter: Eugene Kirpichov
>Assignee: Eugene Kirpichov
>Priority: Major
>
> [https://github.com/apache/beam/pull/3565] introduces the Watch transform 
> [http://s.apache.org/beam-watch-transform].
> The implementation leaves several scalability-related TODOs:
>  1) The state stores hashes and timestamps of outputs that have already been 
> output and should be omitted from future polls. We could garbage-collect this 
> state, e.g. dropping elements from "completed" and from addNewAsPending() if 
> their timestamp is more than X behind the watermark.
>  2) When a poll returns a huge number of elements, we don't necessarily have 
> to add all of them into state.pending - instead we could add only N oldest 
> elements and ignore others, relying on future poll rounds to provide them, in 
> order to avoid blowing up the state. Combined with garbage collection of 
> GrowthState.completed, this would make the transform scalable to very large 
> poll results.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (BEAM-7866) Python MongoDB IO performance and correctness issues

2019-07-31 Thread Eugene Kirpichov (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-7866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16897562#comment-16897562
 ] 

Eugene Kirpichov commented on BEAM-7866:


Yes, splitting by range of id is the right thing to do. And the range of ids 
has to be captured during split() rather than during constructor - constructor 
can not do any IO.

I don't see anything in MongoDB documentation saying that the natural order is 
deterministic; 
https://stackoverflow.com/questions/15467099/mongodb-store-and-select-order 
says that it is not.

> Python MongoDB IO performance and correctness issues
> 
>
> Key: BEAM-7866
> URL: https://issues.apache.org/jira/browse/BEAM-7866
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Eugene Kirpichov
>Assignee: Yichi Zhang
>Priority: Blocker
> Fix For: 2.14.0
>
>
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py
>  splits the query result by computing number of results in constructor, and 
> then in each reader re-executing the whole query and getting an index 
> sub-range of those results.
> This is broken in several critical ways:
> - The order of query results returned by find() is not necessarily 
> deterministic, so the idea of index ranges on it is meaningless: each shard 
> may basically get random, possibly overlapping subsets of the total results
> - Even if you add order by `_id`, the database may be changing concurrently 
> to reading and splitting. E.g. if the database contained documents with ids 
> 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the 
> assumption that these shards would contain respectively 10 20 30, and 40 50), 
> and then suppose shard 10 20 30 is read and then document 25 is inserted - 
> then the 3..5 shard will read 30 40 50, i.e. document 30 is duplicated and 
> document 25 is lost.
> - Every shard re-executes the query and skips the first start_offset items, 
> which in total is quadratic complexity
> - The query is first executed in the constructor in order to count results, 
> which 1) means the constructor can be super slow and 2) it won't work at all 
> if the database is unavailable at the time the pipeline is constructed (e.g. 
> if this is a template).
> Unfortunately, none of these issues are caught by SourceTestUtils: this class 
> has extensive coverage with it, and the tests pass. This is because the tests 
> return the same results in the same order. I don't know how to catch this 
> automatically, and I don't know how to catch the performance issue 
> automatically, but these would all be important follow-up items after the 
> actual fix.
> CC: [~chamikara] as reviewer.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (BEAM-7866) Python MongoDB IO performance and correctness issues

2019-07-31 Thread Eugene Kirpichov (JIRA)
Eugene Kirpichov created BEAM-7866:
--

 Summary: Python MongoDB IO performance and correctness issues
 Key: BEAM-7866
 URL: https://issues.apache.org/jira/browse/BEAM-7866
 Project: Beam
  Issue Type: Bug
  Components: sdk-py-core
Reporter: Eugene Kirpichov
Assignee: Yichi Zhang
 Fix For: 2.14.0


https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py
 splits the query result by computing number of results in constructor, and 
then in each reader re-executing the whole query and getting an index sub-range 
of those results.

This is broken in several critical ways:
- The order of query results returned by find() is not necessarily 
deterministic, so the idea of index ranges on it is meaningless: each shard may 
basically get random, possibly overlapping subsets of the total results
- Even if you add order by `_id`, the database may be changing concurrently to 
reading and splitting. E.g. if the database contained documents with ids 10 20 
30 40 50, and this was split into shards 0..2 and 3..5 (under the assumption 
that these shards would contain respectively 10 20 30, and 40 50), and then 
suppose shard 10 20 30 is read and then document 25 is inserted - then the 3..5 
shard will read 30 40 50, i.e. document 30 is duplicated and document 25 is 
lost.
- Every shard re-executes the query and skips the first start_offset items, 
which in total is quadratic complexity
- The query is first executed in the constructor in order to count results, 
which 1) means the constructor can be super slow and 2) it won't work at all if 
the database is unavailable at the time the pipeline is constructed (e.g. if 
this is a template).

Unfortunately, none of these issues are caught by SourceTestUtils: this class 
has extensive coverage with it, and the tests pass. This is because the tests 
return the same results in the same order. I don't know how to catch this 
automatically, and I don't know how to catch the performance issue 
automatically, but these would all be important follow-up items after the 
actual fix.

CC: [~chamikara] as reviewer.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (BEAM-2857) Create FileIO in Python

2019-03-05 Thread Eugene Kirpichov (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-2857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16784832#comment-16784832
 ] 

Eugene Kirpichov commented on BEAM-2857:


The WriteFiles transform is a leftover of pre-FileIO.write() file writing 
functionality in Java, and I think there's even a JIRA somewhere out there to 
get rid of it or at least rewrite FileIO.write() as an independent transform.

It does not make sense to reimplement WriteFiles in Python - much better to 
implement FileIO.write independently. However, WriteFiles is a valuable source 
of insight for how to structure the implementation as it addresses many very 
subtle points related to data consistency, performance, batch/streaming 
unification etc. - one *needs* to completely understand the whole 
implementation of WriteFiles before embarking on an implementation of 
FileIO.write.

> Create FileIO in Python
> ---
>
> Key: BEAM-2857
> URL: https://issues.apache.org/jira/browse/BEAM-2857
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Eugene Kirpichov
>Assignee: Pablo Estrada
>Priority: Major
>  Labels: gsoc, gsoc2019, mentor, triaged
>
> Beam Java has a FileIO with operations: match()/matchAll(), readMatches(), 
> which together cover the majority of needs for general-purpose file 
> ingestion. Beam Python should have something similar.
> An early design document for this: https://s.apache.org/fileio-beam-python



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-3772) BigQueryIO - Can't use DynamicDestination with CREATE_IF_NEEDED for unbounded PCollection and FILE_LOADS

2019-01-11 Thread Eugene Kirpichov (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3772?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eugene Kirpichov reassigned BEAM-3772:
--

Assignee: Chamikara Jayalath  (was: Eugene Kirpichov)

> BigQueryIO - Can't use DynamicDestination with CREATE_IF_NEEDED for unbounded 
> PCollection and FILE_LOADS
> 
>
> Key: BEAM-3772
> URL: https://issues.apache.org/jira/browse/BEAM-3772
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-gcp
>Affects Versions: 2.2.0, 2.3.0
> Environment: Dataflow streaming pipeline
>Reporter: Benjamin BENOIST
>Assignee: Chamikara Jayalath
>Priority: Major
>
> My workflow : KAFKA -> Dataflow streaming -> BigQuery
> Given that having low-latency isn't important in my case, I use FILE_LOADS to 
> reduce the costs. I'm using _BigQueryIO.Write_ with a _DynamicDestination_, 
> which is a table with the current hour as a suffix.
> This _BigQueryIO.Write_ is configured like this :
> {code:java}
> .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
> .withMethod(Method.FILE_LOADS)
> .withTriggeringFrequency(triggeringFrequency)
> .withNumFileShards(100)
> {code}
> The first table is successfully created and is written to. But then the 
> following tables are never created and I get these exceptions:
> {code:java}
> (99e5cd8c66414e7a): java.lang.RuntimeException: Failed to create load job 
> with id prefix 
> 5047f71312a94bf3a42ee5d67feede75_5295fbf25e1a7534f85e25dcaa9f4986_1_00023,
>  reached max retries: 3, last failed load job: {
>   "configuration" : {
> "load" : {
>   "createDisposition" : "CREATE_NEVER",
>   "destinationTable" : {
> "datasetId" : "dev_mydataset",
> "projectId" : "myproject-id",
> "tableId" : "mytable_20180302_16"
>   },
> {code}
> The _CreateDisposition_ used is _CREATE_NEVER_, contrary as 
> _CREATE_IF_NEEDED_ as specified.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)