[jira] [Commented] (BEAM-12044) JdbcIO should explicitly setAutoCommit to false
[ https://issues.apache.org/jira/browse/BEAM-12044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17308870#comment-17308870 ] Eugene Kirpichov commented on BEAM-12044: - [~aromanenko] my take: it is possible, but I think Sylvain's PR improves the default behavior for all users, so we should do it by default. Because this is in JdbcIO.read(), there are no commits happening anyway, so this should be purely a performance improvement with no semantic changes. Actually there's one possible caveat that I'll mention on the PR in a moment. > JdbcIO should explicitly setAutoCommit to false > --- > > Key: BEAM-12044 > URL: https://issues.apache.org/jira/browse/BEAM-12044 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Affects Versions: 2.28.0 >Reporter: Sylvain Veyrié >Priority: P2 > Time Spent: 50m > Remaining Estimate: 0h > > Hello, > Per [PostgreSQL JDBC > documentation|https://jdbc.postgresql.org/documentation/head/query.html#query-with-cursor], > autocommit must be explicitly disabled on the connection to allow cursor > streaming. > [~jkff] mentionned it [on the mailing > list|https://www.mail-archive.com/dev@beam.apache.org/msg16808.html], however > even if there is: > {code:java} > poolableConnectionFactory.setDefaultAutoCommit(false); > {code} > in JdbcIO:1555, currently, at least with JDBC driver 42.2.16, any read with > JdbcIO will memoize the whole dataset (which leads to OOM), since > {code:java} > connection.getAutoCommit() > {code} > returns true in JdbcIO#ReadFn#processElement. > I can provide a PR — the patch is pretty simple (and solves the problem for > us in 2.28.0): > {code:java} > if (connection == null) { > connection = dataSource.getConnection(); > } > connection.setAutoCommit(false); // line added > {code} > Thanks! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-12044) JdbcIO should explicitly setAutoCommit to false
[ https://issues.apache.org/jira/browse/BEAM-12044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17308322#comment-17308322 ] Eugene Kirpichov commented on BEAM-12044: - When glancing over the code it seems like the setDefaultAutoCommit line in beam JdbcIO actually is not executed - it's used only when you use PoolableDataSourceProvider, it's not used by default, and the scio JdbcIO isn't using it either. I think your proposed patch makes sense, please send a PR, thanks! Just please add a comment explaining why the connection must not be in autocommit mode. A link to the Postgres documentation might be good enough, as long as there are no other database engines requiring the connection to be *in* autocommit mode to allow streaming, which seems unlikely. > JdbcIO should explicitly setAutoCommit to false > --- > > Key: BEAM-12044 > URL: https://issues.apache.org/jira/browse/BEAM-12044 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Affects Versions: 2.28.0 >Reporter: Sylvain Veyrié >Priority: P2 > > Hello, > Per [PostgreSQL JDBC > documentation|https://jdbc.postgresql.org/documentation/head/query.html#query-with-cursor], > autocommit must be explicitly disabled on the connection to allow cursor > streaming. > [~jkff] mentionned it [on the mailing > list|https://www.mail-archive.com/dev@beam.apache.org/msg16808.html], however > even if there is: > {code:java} > poolableConnectionFactory.setDefaultAutoCommit(false); > {code} > in JdbcIO:1555, currently, at least with JDBC driver 42.2.16, any read with > JdbcIO will memoize the whole dataset (which leads to OOM), since > {code:java} > connection.getAutoCommit() > {code} > returns true in JdbcIO#ReadFn#processElement. > I can provide a PR — the patch is pretty simple (and solves the problem for > us in 2.28.0): > {code:java} > if (connection == null) { > connection = dataSource.getConnection(); > } > connection.setAutoCommit(false); // line added > {code} > Thanks! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-12044) JdbcIO should explicitly setAutoCommit to false
[ https://issues.apache.org/jira/browse/BEAM-12044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17307957#comment-17307957 ] Eugene Kirpichov commented on BEAM-12044: - [~sveyrie] Thanks! I no longer work on Beam, but am curious about this issue. Do I understand correctly that the Postgres driver ignores `poolableConnectionFactory.setDefaultAutoCommit(false);` and you need to explicitly `connection.setAutoCommit(false)`? That sounds like a very unlikely bug in the driver, so I suspect that something else is going on - either a bug in JdbcIO, or a bug in your pipeline. Could you give a code snippet of how you instantiate JdbcIO in your pipeline? > JdbcIO should explicitly setAutoCommit to false > --- > > Key: BEAM-12044 > URL: https://issues.apache.org/jira/browse/BEAM-12044 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Affects Versions: 2.28.0 >Reporter: Sylvain Veyrié >Priority: P2 > > Hello, > Per [PostgreSQL JDBC > documentation|https://jdbc.postgresql.org/documentation/head/query.html#query-with-cursor], > autocommit must be explicitly disabled on the connection to allow cursor > streaming. > [~jkff] mentionned it [on the mailing > list|https://www.mail-archive.com/dev@beam.apache.org/msg16808.html], however > even if there is: > {code:java} > poolableConnectionFactory.setDefaultAutoCommit(false); > {code} > in JdbcIO:1555, currently, at least with JDBC driver 42.2.16, any read with > JdbcIO will memoize the whole dataset (which leads to OOM), since > {code:java} > connection.getAutoCommit() > {code} > returns true in JdbcIO#ReadFn#processElement. > I can provide a PR — the patch is pretty simple (and solves the problem for > us in 2.28.0): > {code:java} > if (connection == null) { > connection = dataSource.getConnection(); > } > connection.setAutoCommit(false); // line added > {code} > Thanks! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-10702) Embedded job endpoint artifact service unzips PIP files, making them non-installable
[ https://issues.apache.org/jira/browse/BEAM-10702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eugene Kirpichov updated BEAM-10702: Fix Version/s: 2.24.0 > Embedded job endpoint artifact service unzips PIP files, making them > non-installable > > > Key: BEAM-10702 > URL: https://issues.apache.org/jira/browse/BEAM-10702 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Affects Versions: 2.22.0 >Reporter: Eugene Kirpichov >Assignee: Eugene Kirpichov >Priority: P2 > Fix For: 2.24.0 > > Time Spent: 6.5h > Remaining Estimate: 0h > > For details see this thread: > https://lists.apache.org/thread.html/r36931794495dc2745c39e11410577931a0a8cfb07a48253eaeda3246%40%3Cuser.beam.apache.org%3E > The thread also contains a solution. I'll send a PR - filing the JIRA just to > track. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-10702) Embedded job endpoint artifact service unzips PIP files, making them non-installable
[ https://issues.apache.org/jira/browse/BEAM-10702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eugene Kirpichov updated BEAM-10702: Status: Resolved (was: Open) > Embedded job endpoint artifact service unzips PIP files, making them > non-installable > > > Key: BEAM-10702 > URL: https://issues.apache.org/jira/browse/BEAM-10702 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Affects Versions: 2.22.0 >Reporter: Eugene Kirpichov >Assignee: Eugene Kirpichov >Priority: P2 > Time Spent: 2h 20m > Remaining Estimate: 0h > > For details see this thread: > https://lists.apache.org/thread.html/r36931794495dc2745c39e11410577931a0a8cfb07a48253eaeda3246%40%3Cuser.beam.apache.org%3E > The thread also contains a solution. I'll send a PR - filing the JIRA just to > track. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-10702) Embedded job endpoint artifact service unzips PIP files, making them non-installable
Eugene Kirpichov created BEAM-10702: --- Summary: Embedded job endpoint artifact service unzips PIP files, making them non-installable Key: BEAM-10702 URL: https://issues.apache.org/jira/browse/BEAM-10702 Project: Beam Issue Type: Bug Components: sdk-py-core Affects Versions: 2.22.0 Reporter: Eugene Kirpichov Assignee: Eugene Kirpichov For details see this thread: https://lists.apache.org/thread.html/r36931794495dc2745c39e11410577931a0a8cfb07a48253eaeda3246%40%3Cuser.beam.apache.org%3E The thread also contains a solution. I'll send a PR - filing the JIRA just to track. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-9456) Upgrade to gradle 6.2
[ https://issues.apache.org/jira/browse/BEAM-9456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17175823#comment-17175823 ] Eugene Kirpichov commented on BEAM-9456: FWIW - this issue is further exacerbated by the fact that Gradle currently used in Beam is incompatible with the latest JDK, e.g. with the JDK used by default in Mac OS X Catalina: https://github.com/gradle/gradle/issues/10248#issuecomment-647923554 > Upgrade to gradle 6.2 > - > > Key: BEAM-9456 > URL: https://issues.apache.org/jira/browse/BEAM-9456 > Project: Beam > Issue Type: Task > Components: build-system >Reporter: Alex Van Boxel >Priority: P2 > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-9316) FileIO.Write.relativeFileNaming should not be public
[ https://issues.apache.org/jira/browse/BEAM-9316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17037332#comment-17037332 ] Eugene Kirpichov commented on BEAM-9316: Could you elaborate why the current behavior is undesirable? Do I understand correctly that in the given example it'll write to gs://some/bucket/some/directory/ ? If so, that seems fairly intuitive to me and arguably a useful thing to have: writing to subdirectories within a base directory specified with .to(). I think making relativeFileNaming private would be unfortunate: it's a backwards incompatible change (can break people who use it), and it seems useful to have something of that sort. As for checking whether a fileNaming is relative - I don't think it can be done in general (what if somebody writes a new FileNaming that, logically, is relative, but isn't constructed via relativeFileNaming). I guess you could throw runtime errors if .to() is specified and the FileNaming *returns* something that contains a slash? But again I'm not sure that's desirable. I acknowledge that there is some confusion but I think the best way to address it would be to rename things more clearly (and deprecate the old names). E.g. maybe something like: to() -> toBaseDirectory() and relativeFileNaming() -> toSubDirectory()? There's probably better options too. > FileIO.Write.relativeFileNaming should not be public > > > Key: BEAM-9316 > URL: https://issues.apache.org/jira/browse/BEAM-9316 > Project: Beam > Issue Type: Improvement > Components: io-java-files >Reporter: Claire McGinty >Priority: Major > > I think the existing FileIO.writeDynamic is a bit easy to misuse, as > something like this looks correct, and compiles: > > {{ FileIO.writeDynamic()}} > {{ .by(...)}} > {{ .withNaming(new SerializableFunction[String, FileNaming] {}} > {{ override def apply(str: String): FileNaming =}} > {{ FileIO.Write.relativeFileNaming(}} > {{ "some/directory",}} > {{ new FileNaming {}} > {{ override defFilename(window: BoundedWindow, pane: PaneInfo, > numShards: Int, shardIndex: Int, compression: Compression): String = > "some_filename.txt"}}{{}}} > {{ .via(...)}} > {{ .to("gs://some/bucket")}} > > However, for dynamic writes, if `outputDirectory` (.to("...")) is set, under > the hood, Beam will wrap the provided `fileNamingFn` in > `FileIO.Write.relativeFileNaming(...)` as well, so it ends up as a nested > `relativeFileNaming` function. > ([https://github.com/apache/beam/blob/da9e17288e8473925674a4691d9e86252e67d7d7/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L1243)|https://github.com/apache/beam/blob/da9e17288e8473925674a4691d9e86252e67d7d7/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L1243])] > > IMO, `relativeFileNaming` should either be made private, so that it's only > used internally by FileIO.Write, or a precondition should be added when a > dynamic FileIO.Write is expanded, to check that `outputDirectory` can't be > set if the provided `fileNamingFn` is relative. > > wdyt? > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-8942) Improve support for lambdas that throw checked exceptions in various transforms
[ https://issues.apache.org/jira/browse/BEAM-8942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16992989#comment-16992989 ] Eugene Kirpichov commented on BEAM-8942: Beam Java has a few interfaces for functions - SerializableFunction (doesn't throw, takes lambdas), ProcessFunction (throws, takes lambdas), InferableFunction (throws, doesn't take lambdas). Just having 3 interfaces is already confusing - it seems like SerializableFunction should be deleted or deprecated. However, we should make it so that various PTransforms taking lambdas, e.g. Map/FlatMapElements.via(), FileIO.write().by() etc. can all take throwing lambdas. That might be by removing the SerializableFunction overloads or by adding new methods or any other means. > Improve support for lambdas that throw checked exceptions in various > transforms > --- > > Key: BEAM-8942 > URL: https://issues.apache.org/jira/browse/BEAM-8942 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core >Reporter: Tomo Suzuki >Priority: Major > > Ticket to followup the below: > [~jkff] says: > {quote}Would be nice (not in this PR) to make it easier to do > MapElements.via() with throwing functions without the hassle of fn(lambda, > Requirements.empty()). > {quote} > > [https://github.com/apache/beam/pull/10256/files#r356282252] > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-8942) Improve support for lambdas that throw checked exceptions in various transforms
[ https://issues.apache.org/jira/browse/BEAM-8942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eugene Kirpichov updated BEAM-8942: --- Summary: Improve support for lambdas that throw checked exceptions in various transforms (was: MapElements.via to work with lambda that has IOException ) > Improve support for lambdas that throw checked exceptions in various > transforms > --- > > Key: BEAM-8942 > URL: https://issues.apache.org/jira/browse/BEAM-8942 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core >Reporter: Tomo Suzuki >Priority: Major > > Ticket to followup the below: > [~jkff] says: > {quote}Would be nice (not in this PR) to make it easier to do > MapElements.via() with throwing functions without the hassle of fn(lambda, > Requirements.empty()). > {quote} > > [https://github.com/apache/beam/pull/10256/files#r356282252] > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-8561) Add ThriftIO to Support IO for Thrift Files
[ https://issues.apache.org/jira/browse/BEAM-8561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16968834#comment-16968834 ] Eugene Kirpichov commented on BEAM-8561: Please above all make sure that this connector provides APIs compatible with FileIO: i.e. ThriftIO.readFiles() and ThritfIO.sink(). Providing ThriftIO.read() and write() for basic use cases also makes sense, but there's no need to make them super customizable, or to provide readAll() - all advanced use cases can be handled by the two methods above + FileIO. > Add ThriftIO to Support IO for Thrift Files > --- > > Key: BEAM-8561 > URL: https://issues.apache.org/jira/browse/BEAM-8561 > Project: Beam > Issue Type: New Feature > Components: io-java-files >Reporter: Chris Larsen >Assignee: Chris Larsen >Priority: Minor > > Similar to AvroIO it would be very useful to support reading and writing > to/from Thrift files with a native connector. > Functionality would include: > # read() - Reading from one or more Thrift files. > # write() - Writing to one or more Thrift files. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-2680) Improve scalability of the Watch transform
[ https://issues.apache.org/jira/browse/BEAM-2680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16900155#comment-16900155 ] Eugene Kirpichov commented on BEAM-2680: Ah, it's a very important clarification that the source files actually no longer exist. We probably can't always immediately do that, to accommodate for eventually-consistent filesystems that might have the file flicker between polling rounds for a while, but after a certain timeout we could do that. We'd probably need to switch back to storing the actual keys instead of hashes though. > Improve scalability of the Watch transform > -- > > Key: BEAM-2680 > URL: https://issues.apache.org/jira/browse/BEAM-2680 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Reporter: Eugene Kirpichov >Assignee: Eugene Kirpichov >Priority: Major > > [https://github.com/apache/beam/pull/3565] introduces the Watch transform > [http://s.apache.org/beam-watch-transform]. > The implementation leaves several scalability-related TODOs: > 1) The state stores hashes and timestamps of outputs that have already been > output and should be omitted from future polls. We could garbage-collect this > state, e.g. dropping elements from "completed" and from addNewAsPending() if > their timestamp is more than X behind the watermark. > 2) When a poll returns a huge number of elements, we don't necessarily have > to add all of them into state.pending - instead we could add only N oldest > elements and ignore others, relying on future poll rounds to provide them, in > order to avoid blowing up the state. Combined with garbage collection of > GrowthState.completed, this would make the transform scalable to very large > poll results. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (BEAM-2680) Improve scalability of the Watch transform
[ https://issues.apache.org/jira/browse/BEAM-2680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16900047#comment-16900047 ] Eugene Kirpichov commented on BEAM-2680: Thanks for the heads up, Steve. Does the sharding workaround make sense for your use case? If not, then I guess it's time to implement garbage collection of completed items. Which still would be tricky, so I'd recommend discussing a design on dev@ first. > Improve scalability of the Watch transform > -- > > Key: BEAM-2680 > URL: https://issues.apache.org/jira/browse/BEAM-2680 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Reporter: Eugene Kirpichov >Assignee: Eugene Kirpichov >Priority: Major > > [https://github.com/apache/beam/pull/3565] introduces the Watch transform > [http://s.apache.org/beam-watch-transform]. > The implementation leaves several scalability-related TODOs: > 1) The state stores hashes and timestamps of outputs that have already been > output and should be omitted from future polls. We could garbage-collect this > state, e.g. dropping elements from "completed" and from addNewAsPending() if > their timestamp is more than X behind the watermark. > 2) When a poll returns a huge number of elements, we don't necessarily have > to add all of them into state.pending - instead we could add only N oldest > elements and ignore others, relying on future poll rounds to provide them, in > order to avoid blowing up the state. Combined with garbage collection of > GrowthState.completed, this would make the transform scalable to very large > poll results. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16897562#comment-16897562 ] Eugene Kirpichov commented on BEAM-7866: Yes, splitting by range of id is the right thing to do. And the range of ids has to be captured during split() rather than during constructor - constructor can not do any IO. I don't see anything in MongoDB documentation saying that the natural order is deterministic; https://stackoverflow.com/questions/15467099/mongodb-store-and-select-order says that it is not. > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.14.0 > > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly overlapping subsets of the total results > - Even if you add order by `_id`, the database may be changing concurrently > to reading and splitting. E.g. if the database contained documents with ids > 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the > assumption that these shards would contain respectively 10 20 30, and 40 50), > and then suppose shard 10 20 30 is read and then document 25 is inserted - > then the 3..5 shard will read 30 40 50, i.e. document 30 is duplicated and > document 25 is lost. > - Every shard re-executes the query and skips the first start_offset items, > which in total is quadratic complexity > - The query is first executed in the constructor in order to count results, > which 1) means the constructor can be super slow and 2) it won't work at all > if the database is unavailable at the time the pipeline is constructed (e.g. > if this is a template). > Unfortunately, none of these issues are caught by SourceTestUtils: this class > has extensive coverage with it, and the tests pass. This is because the tests > return the same results in the same order. I don't know how to catch this > automatically, and I don't know how to catch the performance issue > automatically, but these would all be important follow-up items after the > actual fix. > CC: [~chamikara] as reviewer. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (BEAM-7866) Python MongoDB IO performance and correctness issues
Eugene Kirpichov created BEAM-7866: -- Summary: Python MongoDB IO performance and correctness issues Key: BEAM-7866 URL: https://issues.apache.org/jira/browse/BEAM-7866 Project: Beam Issue Type: Bug Components: sdk-py-core Reporter: Eugene Kirpichov Assignee: Yichi Zhang Fix For: 2.14.0 https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py splits the query result by computing number of results in constructor, and then in each reader re-executing the whole query and getting an index sub-range of those results. This is broken in several critical ways: - The order of query results returned by find() is not necessarily deterministic, so the idea of index ranges on it is meaningless: each shard may basically get random, possibly overlapping subsets of the total results - Even if you add order by `_id`, the database may be changing concurrently to reading and splitting. E.g. if the database contained documents with ids 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the assumption that these shards would contain respectively 10 20 30, and 40 50), and then suppose shard 10 20 30 is read and then document 25 is inserted - then the 3..5 shard will read 30 40 50, i.e. document 30 is duplicated and document 25 is lost. - Every shard re-executes the query and skips the first start_offset items, which in total is quadratic complexity - The query is first executed in the constructor in order to count results, which 1) means the constructor can be super slow and 2) it won't work at all if the database is unavailable at the time the pipeline is constructed (e.g. if this is a template). Unfortunately, none of these issues are caught by SourceTestUtils: this class has extensive coverage with it, and the tests pass. This is because the tests return the same results in the same order. I don't know how to catch this automatically, and I don't know how to catch the performance issue automatically, but these would all be important follow-up items after the actual fix. CC: [~chamikara] as reviewer. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (BEAM-2857) Create FileIO in Python
[ https://issues.apache.org/jira/browse/BEAM-2857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16784832#comment-16784832 ] Eugene Kirpichov commented on BEAM-2857: The WriteFiles transform is a leftover of pre-FileIO.write() file writing functionality in Java, and I think there's even a JIRA somewhere out there to get rid of it or at least rewrite FileIO.write() as an independent transform. It does not make sense to reimplement WriteFiles in Python - much better to implement FileIO.write independently. However, WriteFiles is a valuable source of insight for how to structure the implementation as it addresses many very subtle points related to data consistency, performance, batch/streaming unification etc. - one *needs* to completely understand the whole implementation of WriteFiles before embarking on an implementation of FileIO.write. > Create FileIO in Python > --- > > Key: BEAM-2857 > URL: https://issues.apache.org/jira/browse/BEAM-2857 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Pablo Estrada >Priority: Major > Labels: gsoc, gsoc2019, mentor, triaged > > Beam Java has a FileIO with operations: match()/matchAll(), readMatches(), > which together cover the majority of needs for general-purpose file > ingestion. Beam Python should have something similar. > An early design document for this: https://s.apache.org/fileio-beam-python -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (BEAM-3772) BigQueryIO - Can't use DynamicDestination with CREATE_IF_NEEDED for unbounded PCollection and FILE_LOADS
[ https://issues.apache.org/jira/browse/BEAM-3772?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eugene Kirpichov reassigned BEAM-3772: -- Assignee: Chamikara Jayalath (was: Eugene Kirpichov) > BigQueryIO - Can't use DynamicDestination with CREATE_IF_NEEDED for unbounded > PCollection and FILE_LOADS > > > Key: BEAM-3772 > URL: https://issues.apache.org/jira/browse/BEAM-3772 > Project: Beam > Issue Type: Bug > Components: io-java-gcp >Affects Versions: 2.2.0, 2.3.0 > Environment: Dataflow streaming pipeline >Reporter: Benjamin BENOIST >Assignee: Chamikara Jayalath >Priority: Major > > My workflow : KAFKA -> Dataflow streaming -> BigQuery > Given that having low-latency isn't important in my case, I use FILE_LOADS to > reduce the costs. I'm using _BigQueryIO.Write_ with a _DynamicDestination_, > which is a table with the current hour as a suffix. > This _BigQueryIO.Write_ is configured like this : > {code:java} > .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) > .withMethod(Method.FILE_LOADS) > .withTriggeringFrequency(triggeringFrequency) > .withNumFileShards(100) > {code} > The first table is successfully created and is written to. But then the > following tables are never created and I get these exceptions: > {code:java} > (99e5cd8c66414e7a): java.lang.RuntimeException: Failed to create load job > with id prefix > 5047f71312a94bf3a42ee5d67feede75_5295fbf25e1a7534f85e25dcaa9f4986_1_00023, > reached max retries: 3, last failed load job: { > "configuration" : { > "load" : { > "createDisposition" : "CREATE_NEVER", > "destinationTable" : { > "datasetId" : "dev_mydataset", > "projectId" : "myproject-id", > "tableId" : "mytable_20180302_16" > }, > {code} > The _CreateDisposition_ used is _CREATE_NEVER_, contrary as > _CREATE_IF_NEEDED_ as specified. -- This message was sent by Atlassian JIRA (v7.6.3#76005)