[jira] [Commented] (BEAM-3778) Very poor performance of side inputs when input is finely sharded

2018-03-05 Thread Eugene Kirpichov (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16386884#comment-16386884
 ] 

Eugene Kirpichov commented on BEAM-3778:


More precisely: 
[https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java#L1083]

"Each bundle maps to one file exactly" is not what we want here. Something 
that's passed to View.asIterable or View.asList is likely not so big as to try 
to save on a reshuffle, so we should just insert a Reshuffle.viaRandomKey() 
before the ParDo.

> Very poor performance of side inputs when input is finely sharded
> -
>
> Key: BEAM-3778
> URL: https://issues.apache.org/jira/browse/BEAM-3778
> Project: Beam
>  Issue Type: Bug
>  Components: runner-dataflow
>Reporter: Eugene Kirpichov
>Assignee: Luke Cwik
>Priority: Major
>
> This thread:
> https://lists.apache.org/thread.html/324a4f86e567e3e1692466e70f44a08276123b467bacb2ecbf00515f@%3Cuser.beam.apache.org%3E
> The user has a job that reads a few hundred thousand files and then writes 
> them to BigQuery. This generates 1 temp file per input file. Then we gather 
> the temp files into a View.asList() side input - and this side input ends up 
> containing a few hundred thousand tiny ISM files, with 1 element per file, 
> which performs horribly (taking hours to read the side input).
> I think we need to reshuffle things onto a reasonable number of shards before 
> writing them to ISM.
> A side issue: this 
> https://github.com/apache/beam/blob/v2.2.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ReifyAsIterable.java#L46
>  triggers also the coder size estimation logic, which falsely thinks that 
> size estimation in this case is cheap, and does double the work, as evidenced 
> by the following stack trace:
> Processing lull for PT30900.015S in state process of 
> WriteTOBigQuery/BatchLoads/ReifyResults/ParDo(Anonymous)
>  java.net.SocketInputStream.socketRead0(Native Method)
>  java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
>  java.net.SocketInputStream.read(SocketInputStream.java:170)
>  java.net.SocketInputStream.read(SocketInputStream.java:141)
>  sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
>  sun.security.ssl.InputRecord.read(InputRecord.java:503)
>  sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:983)
>  sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:940)
>  sun.security.ssl.AppInputStream.read(AppInputStream.java:105)
>  java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
>  java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
>  java.io.BufferedInputStream.read(BufferedInputStream.java:345)
>  sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:704)
>  sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:647)
>  
> sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1536)
>  
> sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1441)
>  java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
>  
> sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:338)
>  
> com.google.api.client.http.javanet.NetHttpResponse.(NetHttpResponse.java:37)
>  
> com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:94)
>  com.google.api.client.http.HttpRequest.execute(HttpRequest.java:981)
>  
> com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419)
>  
> com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
>  
> com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeMedia(AbstractGoogleClientRequest.java:380)
>  
> com.google.api.services.storage.Storage$Objects$Get.executeMedia(Storage.java:4784)
>  
> com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.openStreamAndSetMetadata(GoogleCloudStorageReadChannel.java:656)
>  
> com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.performLazySeek(GoogleCloudStorageReadChannel.java:560)
>  
> com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.read(GoogleCloudStorageReadChannel.java:289)
>  sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:65)
>  sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:109)
>  sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
>  java.io.InputStream.read(InputStream.java:101)
>  sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:81)
>  org.apache.beam.sdk.util.VarInt.decodeLong(VarInt.java:79)
>  org.apache.beam.sdk.util.Va

[jira] [Commented] (BEAM-3778) Very poor performance of side inputs when input is finely sharded

2018-03-05 Thread Eugene Kirpichov (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16386874#comment-16386874
 ] 

Eugene Kirpichov commented on BEAM-3778:


Meanwhile, BigQueryIO per se can circumvent this issue by reshuffling the input 
that goes into ReifyAsIterable.

> Very poor performance of side inputs when input is finely sharded
> -
>
> Key: BEAM-3778
> URL: https://issues.apache.org/jira/browse/BEAM-3778
> Project: Beam
>  Issue Type: Bug
>  Components: runner-dataflow
>Reporter: Eugene Kirpichov
>Assignee: Luke Cwik
>Priority: Major
>
> This thread:
> https://lists.apache.org/thread.html/324a4f86e567e3e1692466e70f44a08276123b467bacb2ecbf00515f@%3Cuser.beam.apache.org%3E
> The user has a job that reads a few hundred thousand files and then writes 
> them to BigQuery. This generates 1 temp file per input file. Then we gather 
> the temp files into a View.asList() side input - and this side input ends up 
> containing a few hundred thousand tiny ISM files, with 1 element per file, 
> which performs horribly (taking hours to read the side input).
> I think we need to reshuffle things onto a reasonable number of shards before 
> writing them to ISM.
> A side issue: this 
> https://github.com/apache/beam/blob/v2.2.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ReifyAsIterable.java#L46
>  triggers also the coder size estimation logic, which falsely thinks that 
> size estimation in this case is cheap, and does double the work, as evidenced 
> by the following stack trace:
> Processing lull for PT30900.015S in state process of 
> WriteTOBigQuery/BatchLoads/ReifyResults/ParDo(Anonymous)
>  java.net.SocketInputStream.socketRead0(Native Method)
>  java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
>  java.net.SocketInputStream.read(SocketInputStream.java:170)
>  java.net.SocketInputStream.read(SocketInputStream.java:141)
>  sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
>  sun.security.ssl.InputRecord.read(InputRecord.java:503)
>  sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:983)
>  sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:940)
>  sun.security.ssl.AppInputStream.read(AppInputStream.java:105)
>  java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
>  java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
>  java.io.BufferedInputStream.read(BufferedInputStream.java:345)
>  sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:704)
>  sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:647)
>  
> sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1536)
>  
> sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1441)
>  java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
>  
> sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:338)
>  
> com.google.api.client.http.javanet.NetHttpResponse.(NetHttpResponse.java:37)
>  
> com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:94)
>  com.google.api.client.http.HttpRequest.execute(HttpRequest.java:981)
>  
> com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419)
>  
> com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
>  
> com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeMedia(AbstractGoogleClientRequest.java:380)
>  
> com.google.api.services.storage.Storage$Objects$Get.executeMedia(Storage.java:4784)
>  
> com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.openStreamAndSetMetadata(GoogleCloudStorageReadChannel.java:656)
>  
> com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.performLazySeek(GoogleCloudStorageReadChannel.java:560)
>  
> com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.read(GoogleCloudStorageReadChannel.java:289)
>  sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:65)
>  sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:109)
>  sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
>  java.io.InputStream.read(InputStream.java:101)
>  sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:81)
>  org.apache.beam.sdk.util.VarInt.decodeLong(VarInt.java:79)
>  org.apache.beam.sdk.util.VarInt.decodeInt(VarInt.java:63)
>  
> org.apache.beam.runners.dataflow.internal.IsmFormat$KeyPrefixCoder.decode(IsmFormat.java:694)
>  com.google.cloud.dataflow.worker.IsmReader.readKey(IsmReader.java:999)
>  com.google.cloud.dataflow.worker.IsmReader.access$2000(IsmReader.java:79)
>  
> com.google.cloud.dataflow