Sandeep, you need to add a sort to your query before you write it to
Iceberg. Iceberg doesn't allow you to write multiple files to the same
partition, so the data for each partition needs to be clustered together.
To do that, add an `order by` with your partition columns, columns you're
going to use for filtering, and a high-cardinality field, like an ID.

On Fri, Apr 5, 2019 at 4:57 PM Sandeep Sagar <sandeep.sa...@meltwater.com>
wrote:

> Yes, that issue is resolved with commit (0dbcd5c)– Thanks!!
>
>
>
> But now the update is failing in the latest.  Attaching stack trace below.
>
>
>
> My HelloWorld code is posted at
> https://github.com/masterchief2007/floeberg .
>
>
>
> thanks
>
> -Sandeep
>
>
>
> 2019-04-05 16:47:37,142  INFO [main] (apache.iceberg.BaseTableScan:166) -
> Scanning table s3a://tahoe-dev-today/ snapshot 5212062174936997716 created
> at 2019-04-05 16:45:21.445 with filter
> (not(not_null(ref(name="company_type"))) or not(ref(name="company_type") ==
> "test-1"))
>
> 2019-04-05 16:47:55,794  INFO [main] (apache.iceberg.BaseTableScan:166) -
> Scanning table s3a://tahoe-dev-today/ snapshot 5212062174936997716 created
> at 2019-04-05 16:45:21.445 with filter (ref(name="company_type") ==
> "test-1" and not_null(ref(name="company_type")))
>
>
>
> 2019-04-05 16:48:45,001  WARN [Executor task launch worker for task 111]
> (spark.source.Writer$PartitionedWriter:336) - Duplicate key: [test-5] ==
> [test-5]
>
> 2019-04-05 16:48:45,003 ERROR [Executor task launch worker for task 111]
> (spark.internal.Logging:91) - Aborting task
>
> java.lang.IllegalStateException: Already closed file for partition:
> company_type=test-5
>
>                 at
> org.apache.iceberg.spark.source.Writer$PartitionedWriter.write(Writer.java:337)
>
>                 at
> org.apache.iceberg.spark.source.Writer$PartitionedWriter.write(Writer.java:298)
>
>                 at
> org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$2(WriteToDataSourceV2Exec.scala:118)
>
>                 at
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
>
>                 at
> org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:116)
>
>                 at
> org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.$anonfun$doExecute$2(WriteToDataSourceV2Exec.scala:67)
>
>                 at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>
>                 at org.apache.spark.scheduler.Task.run(Task.scala:121)
>
>                 at
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:405)
>
>                 at
> org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
>
>                 at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
>
>                 at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
>                 at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
>                 at java.lang.Thread.run(Thread.java:748)
>
>
>
> ……
>
> ……
>
> Driver stacktrace:
>
>                 at
> org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:1887)
>
>                 at
> org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1875)
>
>                 at
> org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1874)
>
>                 at
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>
>                 at
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>
>                 at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>
>                 at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1874)
>
>                 at
> org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:926)
>
>                 at
> org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:926)
>
>                 at scala.Option.foreach(Option.scala:274)
>
>                 at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
>
>                 at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2108)
>
>                 at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2057)
>
>                 at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2046)
>
>                 at
> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
>
>                 at
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
>
>                 at
> org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
>
>                 at
> org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:64)
>
>                 ... 16 more
>
> Caused by: java.lang.IllegalStateException: Already closed file for
> partition: company_type=test-5
>
>                 at
> org.apache.iceberg.spark.source.Writer$PartitionedWriter.write(Writer.java:337)
>
>                 at
> org.apache.iceberg.spark.source.Writer$PartitionedWriter.write(Writer.java:298)
>
>                 at
> org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$2(WriteToDataSourceV2Exec.scala:118)
>
>                 at
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
>
>                 at
> org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:116)
>
>                 at
> org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.$anonfun$doExecute$2(WriteToDataSourceV2Exec.scala:67)
>
>                 at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>
>                 at org.apache.spark.scheduler.Task.run(Task.scala:121)
>
>                 at
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:405)
>
>                 at
> org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
>
>                 at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
>
>                 at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
>                 at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
>                 at java.lang.Thread.run(Thread.java:748)
>
> 2019-04-05 16:49:10,654  INFO [pool-2-thread-1]
> (jetty.server.AbstractConnector:318) - Stopped
> Spark@32130e61{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
>
>
>
>
>
> *From: *Sandeep Sagar <sandeep.sa...@meltwater.com>
> *Date: *Friday, April 5, 2019 at 4:41 PM
> *To: *"dev@iceberg.apache.org" <dev@iceberg.apache.org>, "
> rb...@netflix.com" <rb...@netflix.com>
> *Subject: *Re: Help: Timeout waiting for connection from pool (S3)
>
>
>
> Yes, I saw the commit just now and am building. Will update.
>
> The log files do complain about open file streams in the finalizer.
>
>
>
> 2019-04-05 16:20:21,303 WARN Finalizer
> org.apache.iceberg.hadoop.HadoopStreams - Unclosed input stream created by:
>
>
> org.apache.iceberg.hadoop.HadoopStreams$HadoopSeekableInputStream.<init>(HadoopStreams.java:77)
>
>        org.apache.iceberg.hadoop.HadoopStreams.wrap(HadoopStreams.java:52)
>
>
> org.apache.iceberg.hadoop.HadoopInputFile.newStream(HadoopInputFile.java:125)
>
>
> org.apache.iceberg.avro.AvroIterable.newFileReader(AvroIterable.java:95)
>
>        org.apache.iceberg.avro.AvroIterable.iterator(AvroIterable.java:77)
>
>        com.google.common.collect.Iterables$4.iterator(Iterables.java:559)
>
>        com.google.common.collect.Iterables$5.iterator(Iterables.java:698)
>
>        org.apache.iceberg.ManifestReader.iterator(ManifestReader.java:240)
>
>
> org.apache.iceberg.FilteredManifest.iterator(FilteredManifest.java:128)
>
>        com.google.common.collect.Iterables$5.iterator(Iterables.java:698)
>
>
> org.apache.iceberg.util.ParallelIterable$ParallelIterator.lambda$null$0(ParallelIterable.java:60)
>
>
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>
>        java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
>        java.lang.Thread.run(Thread.java:748)
>
>
>
> thanks
>
>
>
> *From: *Ryan Blue <rb...@netflix.com.INVALID>
> *Reply-To: *"dev@iceberg.apache.org" <dev@iceberg.apache.org>, "
> rb...@netflix.com" <rb...@netflix.com>
> *Date: *Friday, April 5, 2019 at 4:38 PM
> *To: *Iceberg Dev List <dev@iceberg.apache.org>
> *Subject: *Re: Help: Timeout waiting for connection from pool (S3)
>
>
>
> Can you try with the current master? I just merged a fix for a file leak
> that could cause this.
>
>
>
> This also looks like it is happening in a task, not on the driver. Do your
> task logs complain about closing files in a finalizer? We added a finalizer
> that will log when open files are leaked so we can clean them up. That's
> how we caught the scan ones.
>
>
>
> On Fri, Apr 5, 2019 at 4:28 PM Sandeep Sagar <sandeep.sa...@meltwater.com>
> wrote:
>
> Hi all,
>
> Need some help to understand why I am running into
>
>      com.amazonaws.SdkClientException: Unable to execute HTTP request:
> Timeout waiting for connection from pool
>
>
>
> I have written a simple program to just save a dataset and read it. This
> op works when writing to disk.
>
> When I *changed it to use S3*, I get the issue.
>
> The save to S3 is  ok.
>
> Is it possible that the S3ObjectInputStream is not being closed somewhere,
> leading to all threads being exhausted in the pool?
>
>
>
> Am using the latest build of iceberg.
>
> Regards
>
> Sandeep
>
> Stack Trace-
>
> 2019-04-05 15:31:55,224 INFO main org.apache.iceberg.TableScan - Scanning
> table s3a://tahoe-dev-today/ snapshot 7179336048327305337 created at
> 2019-04-05 15:30:46.616 with filter true
>
>
>
>
>
> 2019-04-05 15:37:53,816 ERROR Executor task launch worker for task 8
> org.apache.spark.executor.Executor - Exception in task 0.0 in stage 1.0
> (TID 8)
>
> org.apache.iceberg.exceptions.RuntimeIOException: Failed to get status for
> file:
> s3a://tahoe-dev-today/data/company_type=test-2/00000-0-b1255a21-99f3-4005-9b29-999bf1862e34.parquet
>
>        at
> org.apache.iceberg.hadoop.HadoopInputFile.lazyStat(HadoopInputFile.java:108)
>
>        at
> org.apache.iceberg.hadoop.HadoopInputFile.getStat(HadoopInputFile.java:136)
>
>        at org.apache.iceberg.parquet.ParquetIO.file(ParquetIO.java:57)
>
>        at
> org.apache.iceberg.parquet.ParquetReader$ReadConf.newReader(ParquetReader.java:163)
>
>        at
> org.apache.iceberg.parquet.ParquetReader$ReadConf.<init>(ParquetReader.java:81)
>
>        at
> org.apache.iceberg.parquet.ParquetReader.init(ParquetReader.java:174)
>
>        at
> org.apache.iceberg.parquet.ParquetReader.iterator(ParquetReader.java:185)
>
>        at
> org.apache.iceberg.spark.source.Reader$TaskDataReader.open(Reader.java:442)
>
>        at
> org.apache.iceberg.spark.source.Reader$TaskDataReader.open(Reader.java:382)
>
>        at
> org.apache.iceberg.spark.source.Reader$TaskDataReader.<init>(Reader.java:317)
>
>        at
> org.apache.iceberg.spark.source.Reader$ReadTask.createPartitionReader(Reader.java:266)
>
>        at
> org.apache.spark.sql.execution.datasources.v2.DataSourceRDD.compute(DataSourceRDD.scala:41)
>
>        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>
>        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>
>        at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>
>        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>
>        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>
>        at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>
>        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>
>        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>
>        at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>
>        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>
>        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>
>        at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>
>        at org.apache.spark.scheduler.Task.run(Task.scala:121)
>
>        at
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:405)
>
>        at
> org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
>
>        at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
>
>        at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
>        at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
>        at java.lang.Thread.run(Thread.java:748)
>
> Caused by: java.io.InterruptedIOException: getFileStatus on
> s3a://tahoe-dev-today/data/company_type=test-2/00000-0-b1255a21-99f3-4005-9b29-999bf1862e34.parquet:
> com.amazonaws.SdkClientException: Unable to execute HTTP request: Timeout
> waiting for connection from pool
>
>        at
> org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:141)
>
>        at
> org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:117)
>
>        at
> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:1844)
>
>        at
> org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:1808)
>
>        at
> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1748)
>
>        at
> org.apache.iceberg.hadoop.HadoopInputFile.lazyStat(HadoopInputFile.java:106)
>
>        ... 30 more
>
> Caused by: com.amazonaws.SdkClientException: Unable to execute HTTP
> request: Timeout waiting for connection from pool
>
>        at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1113)
>
>        at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1063)
>
>        at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
>
>        at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
>
>        at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
>
>        at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
>
>        at
> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
>
>        at
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
>
>        at
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4247)
>
>        at
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4194)
>
>        at
> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1253)
>
>        at
> org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:1038)
>
>        at
> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:1826)
>
>        ... 33 more
>
> Caused by:
> com.amazonaws.thirdparty.apache.http.conn.ConnectionPoolTimeoutException:
> Timeout waiting for connection from pool
>
>        at
> com.amazonaws.thirdparty.apache.http.impl.conn.PoolingHttpClientConnectionManager.leaseConnection(PoolingHttpClientConnectionManager.java:286)
>
>        at
> com.amazonaws.thirdparty.apache.http.impl.conn.PoolingHttpClientConnectionManager$1.get(PoolingHttpClientConnectionManager.java:263)
>
>        at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
>
>        at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
>        at java.lang.reflect.Method.invoke(Method.java:498)
>
>        at
> com.amazonaws.http.conn.ClientConnectionRequestFactory$Handler.invoke(ClientConnectionRequestFactory.java:70)
>
>        at com.amazonaws.http.conn.$Proxy8.get(Unknown Source)
>
>        at
> com.amazonaws.thirdparty.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:190)
>
>        at
> com.amazonaws.thirdparty.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:184)
>
>        at
> com.amazonaws.thirdparty.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:184)
>
>        at
> com.amazonaws.thirdparty.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
>
>        at
> com.amazonaws.thirdparty.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:55)
>
>        at
> com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
>
>        at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1235)
>
>        at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1055)
>
>        ... 44 more
>
> 2019-04-05 15:37:53,842 ERROR task-result-getter-3
> org.apache.spark.scheduler.TaskSetManager - Task 0 in stage 1.0 failed 1
> times; aborting job
>
> 2019-04-05 15:37:53,870 INFO pool-2-thread-1
> org.spark_project.jetty.server.AbstractConnector - Stopped Spark@25e8e59
> {HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
>
>
>
> The information contained in this email may be confidential. It has been
> sent for the sole use of the intended recipient(s). If the reader of this
> email is not an intended recipient, you are hereby notified that any
> unauthorized review, use, disclosure, dissemination, distribution, or
> copying of this message is strictly prohibited. If you have received this
> email in error, please notify the sender immediately and destroy all copies
> of the message.
>
>
>
>
> --
>
> Ryan Blue
>
> Software Engineer
>
> Netflix
> The information contained in this email may be confidential. It has been
> sent for the sole use of the intended recipient(s). If the reader of this
> email is not an intended recipient, you are hereby notified that any
> unauthorized review, use, disclosure, dissemination, distribution, or
> copying of this message is strictly prohibited. If you have received this
> email in error, please notify the sender immediately and destroy all copies
> of the message.
>


-- 
Ryan Blue
Software Engineer
Netflix

Reply via email to