[jira] [Commented] (BEAM-3778) Very poor performance of side inputs when input is finely sharded
[ https://issues.apache.org/jira/browse/BEAM-3778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16386884#comment-16386884 ] Eugene Kirpichov commented on BEAM-3778: More precisely: [https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java#L1083] "Each bundle maps to one file exactly" is not what we want here. Something that's passed to View.asIterable or View.asList is likely not so big as to try to save on a reshuffle, so we should just insert a Reshuffle.viaRandomKey() before the ParDo. > Very poor performance of side inputs when input is finely sharded > - > > Key: BEAM-3778 > URL: https://issues.apache.org/jira/browse/BEAM-3778 > Project: Beam > Issue Type: Bug > Components: runner-dataflow >Reporter: Eugene Kirpichov >Assignee: Luke Cwik >Priority: Major > > This thread: > https://lists.apache.org/thread.html/324a4f86e567e3e1692466e70f44a08276123b467bacb2ecbf00515f@%3Cuser.beam.apache.org%3E > The user has a job that reads a few hundred thousand files and then writes > them to BigQuery. This generates 1 temp file per input file. Then we gather > the temp files into a View.asList() side input - and this side input ends up > containing a few hundred thousand tiny ISM files, with 1 element per file, > which performs horribly (taking hours to read the side input). > I think we need to reshuffle things onto a reasonable number of shards before > writing them to ISM. > A side issue: this > https://github.com/apache/beam/blob/v2.2.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ReifyAsIterable.java#L46 > triggers also the coder size estimation logic, which falsely thinks that > size estimation in this case is cheap, and does double the work, as evidenced > by the following stack trace: > Processing lull for PT30900.015S in state process of > WriteTOBigQuery/BatchLoads/ReifyResults/ParDo(Anonymous) > java.net.SocketInputStream.socketRead0(Native Method) > java.net.SocketInputStream.socketRead(SocketInputStream.java:116) > java.net.SocketInputStream.read(SocketInputStream.java:170) > java.net.SocketInputStream.read(SocketInputStream.java:141) > sun.security.ssl.InputRecord.readFully(InputRecord.java:465) > sun.security.ssl.InputRecord.read(InputRecord.java:503) > sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:983) > sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:940) > sun.security.ssl.AppInputStream.read(AppInputStream.java:105) > java.io.BufferedInputStream.fill(BufferedInputStream.java:246) > java.io.BufferedInputStream.read1(BufferedInputStream.java:286) > java.io.BufferedInputStream.read(BufferedInputStream.java:345) > sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:704) > sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:647) > > sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1536) > > sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1441) > java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480) > > sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:338) > > com.google.api.client.http.javanet.NetHttpResponse.(NetHttpResponse.java:37) > > com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:94) > com.google.api.client.http.HttpRequest.execute(HttpRequest.java:981) > > com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419) > > com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352) > > com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeMedia(AbstractGoogleClientRequest.java:380) > > com.google.api.services.storage.Storage$Objects$Get.executeMedia(Storage.java:4784) > > com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.openStreamAndSetMetadata(GoogleCloudStorageReadChannel.java:656) > > com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.performLazySeek(GoogleCloudStorageReadChannel.java:560) > > com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.read(GoogleCloudStorageReadChannel.java:289) > sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:65) > sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:109) > sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) > java.io.InputStream.read(InputStream.java:101) > sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:81) > org.apache.beam.sdk.util.VarInt.decodeLong(VarInt.java:79) > org.apache.beam.sdk.util.Va
[jira] [Commented] (BEAM-3778) Very poor performance of side inputs when input is finely sharded
[ https://issues.apache.org/jira/browse/BEAM-3778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16386874#comment-16386874 ] Eugene Kirpichov commented on BEAM-3778: Meanwhile, BigQueryIO per se can circumvent this issue by reshuffling the input that goes into ReifyAsIterable. > Very poor performance of side inputs when input is finely sharded > - > > Key: BEAM-3778 > URL: https://issues.apache.org/jira/browse/BEAM-3778 > Project: Beam > Issue Type: Bug > Components: runner-dataflow >Reporter: Eugene Kirpichov >Assignee: Luke Cwik >Priority: Major > > This thread: > https://lists.apache.org/thread.html/324a4f86e567e3e1692466e70f44a08276123b467bacb2ecbf00515f@%3Cuser.beam.apache.org%3E > The user has a job that reads a few hundred thousand files and then writes > them to BigQuery. This generates 1 temp file per input file. Then we gather > the temp files into a View.asList() side input - and this side input ends up > containing a few hundred thousand tiny ISM files, with 1 element per file, > which performs horribly (taking hours to read the side input). > I think we need to reshuffle things onto a reasonable number of shards before > writing them to ISM. > A side issue: this > https://github.com/apache/beam/blob/v2.2.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ReifyAsIterable.java#L46 > triggers also the coder size estimation logic, which falsely thinks that > size estimation in this case is cheap, and does double the work, as evidenced > by the following stack trace: > Processing lull for PT30900.015S in state process of > WriteTOBigQuery/BatchLoads/ReifyResults/ParDo(Anonymous) > java.net.SocketInputStream.socketRead0(Native Method) > java.net.SocketInputStream.socketRead(SocketInputStream.java:116) > java.net.SocketInputStream.read(SocketInputStream.java:170) > java.net.SocketInputStream.read(SocketInputStream.java:141) > sun.security.ssl.InputRecord.readFully(InputRecord.java:465) > sun.security.ssl.InputRecord.read(InputRecord.java:503) > sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:983) > sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:940) > sun.security.ssl.AppInputStream.read(AppInputStream.java:105) > java.io.BufferedInputStream.fill(BufferedInputStream.java:246) > java.io.BufferedInputStream.read1(BufferedInputStream.java:286) > java.io.BufferedInputStream.read(BufferedInputStream.java:345) > sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:704) > sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:647) > > sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1536) > > sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1441) > java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480) > > sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:338) > > com.google.api.client.http.javanet.NetHttpResponse.(NetHttpResponse.java:37) > > com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:94) > com.google.api.client.http.HttpRequest.execute(HttpRequest.java:981) > > com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419) > > com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352) > > com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeMedia(AbstractGoogleClientRequest.java:380) > > com.google.api.services.storage.Storage$Objects$Get.executeMedia(Storage.java:4784) > > com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.openStreamAndSetMetadata(GoogleCloudStorageReadChannel.java:656) > > com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.performLazySeek(GoogleCloudStorageReadChannel.java:560) > > com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.read(GoogleCloudStorageReadChannel.java:289) > sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:65) > sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:109) > sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) > java.io.InputStream.read(InputStream.java:101) > sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:81) > org.apache.beam.sdk.util.VarInt.decodeLong(VarInt.java:79) > org.apache.beam.sdk.util.VarInt.decodeInt(VarInt.java:63) > > org.apache.beam.runners.dataflow.internal.IsmFormat$KeyPrefixCoder.decode(IsmFormat.java:694) > com.google.cloud.dataflow.worker.IsmReader.readKey(IsmReader.java:999) > com.google.cloud.dataflow.worker.IsmReader.access$2000(IsmReader.java:79) > > com.google.cloud.dataflow