[GitHub] spark pull request #22399: [SPARK-25408] Move to mode ideomatic Java8
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/22399 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22399: [SPARK-25408] Move to mode ideomatic Java8
Github user Fokko commented on a diff in the pull request: https://github.com/apache/spark/pull/22399#discussion_r219480345 --- Diff: sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/CLIService.java --- @@ -146,16 +146,11 @@ public UserGroupInformation getHttpUGI() { public synchronized void start() { super.start(); // Initialize and test a connection to the metastore -IMetaStoreClient metastoreClient = null; try { - metastoreClient = new HiveMetaStoreClient(hiveConf); - metastoreClient.getDatabases("default"); -} catch (Exception e) { - throw new ServiceException("Unable to connect to MetaStore!", e); -} -finally { - if (metastoreClient != null) { -metastoreClient.close(); + try (IMetaStoreClient metastoreClient = new HiveMetaStoreClient(hiveConf)) { --- End diff -- Good one, thanks --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22399: [SPARK-25408] Move to mode ideomatic Java8
Github user Fokko commented on a diff in the pull request: https://github.com/apache/spark/pull/22399#discussion_r218678719 --- Diff: common/network-common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java --- @@ -143,37 +143,38 @@ public void releaseBuffers() { } private FetchResult fetchChunks(List chunkIndices) throws Exception { -TransportClient client = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort()); -final Semaphore sem = new Semaphore(0); - final FetchResult res = new FetchResult(); --- End diff -- Any time --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22399: [SPARK-25408] Move to mode ideomatic Java8
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/22399#discussion_r218635691 --- Diff: common/network-common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java --- @@ -143,37 +143,38 @@ public void releaseBuffers() { } private FetchResult fetchChunks(List chunkIndices) throws Exception { -TransportClient client = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort()); -final Semaphore sem = new Semaphore(0); - final FetchResult res = new FetchResult(); --- End diff -- I see, thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22399: [SPARK-25408] Move to mode ideomatic Java8
Github user Fokko commented on a diff in the pull request: https://github.com/apache/spark/pull/22399#discussion_r218532105 --- Diff: common/network-common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java --- @@ -143,37 +143,38 @@ public void releaseBuffers() { } private FetchResult fetchChunks(List chunkIndices) throws Exception { -TransportClient client = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort()); -final Semaphore sem = new Semaphore(0); - final FetchResult res = new FetchResult(); --- End diff -- No we can't, since the `res` is returned by the function :-) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22399: [SPARK-25408] Move to mode ideomatic Java8
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/22399#discussion_r218121426 --- Diff: sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/CLIService.java --- @@ -146,16 +146,11 @@ public UserGroupInformation getHttpUGI() { public synchronized void start() { super.start(); // Initialize and test a connection to the metastore -IMetaStoreClient metastoreClient = null; try { - metastoreClient = new HiveMetaStoreClient(hiveConf); - metastoreClient.getDatabases("default"); -} catch (Exception e) { - throw new ServiceException("Unable to connect to MetaStore!", e); -} -finally { - if (metastoreClient != null) { -metastoreClient.close(); + try (IMetaStoreClient metastoreClient = new HiveMetaStoreClient(hiveConf)) { --- End diff -- Do we need `try` at line 149? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22399: [SPARK-25408] Move to mode ideomatic Java8
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/22399#discussion_r218117766 --- Diff: common/network-common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java --- @@ -143,37 +143,38 @@ public void releaseBuffers() { } private FetchResult fetchChunks(List chunkIndices) throws Exception { -TransportClient client = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort()); -final Semaphore sem = new Semaphore(0); - final FetchResult res = new FetchResult(); --- End diff -- nit: Can we put this line into `try` since this line was originally executed after `clientFactory.createClient`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22399: [SPARK-25408] Move to mode ideomatic Java8
Github user Fokko commented on a diff in the pull request: https://github.com/apache/spark/pull/22399#discussion_r218078332 --- Diff: sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatchSuite.java --- @@ -356,49 +335,45 @@ public void appendRowUntilExceedingPageSize() throws Exception { Assert.assertTrue(checkValue(value1, 1, 1)); } Assert.assertFalse(iterator.next()); -} finally { - batch.close(); } } @Test public void failureToAllocateFirstPage() throws Exception { memoryManager.limit(1024); -RowBasedKeyValueBatch batch = RowBasedKeyValueBatch.allocate(keySchema, -valueSchema, taskMemoryManager, DEFAULT_CAPACITY); -try { +try (RowBasedKeyValueBatch batch = RowBasedKeyValueBatch.allocate(keySchema, +valueSchema, taskMemoryManager, DEFAULT_CAPACITY)) { UnsafeRow key = makeKeyRow(1, "A"); UnsafeRow value = makeValueRow(11, 11); UnsafeRow ret = appendRow(batch, key, value); Assert.assertNull(ret); Assert.assertFalse(batch.rowIterator().next()); -} finally { - batch.close(); } } @Test public void randomizedTest() { -RowBasedKeyValueBatch batch = RowBasedKeyValueBatch.allocate(keySchema, -valueSchema, taskMemoryManager, DEFAULT_CAPACITY); -int numEntry = 100; -long[] expectedK1 = new long[numEntry]; -String[] expectedK2 = new String[numEntry]; -long[] expectedV1 = new long[numEntry]; -long[] expectedV2 = new long[numEntry]; - -for (int i = 0; i < numEntry; i++) { - long k1 = rand.nextLong(); - String k2 = getRandomString(rand.nextInt(256)); - long v1 = rand.nextLong(); - long v2 = rand.nextLong(); - appendRow(batch, makeKeyRow(k1, k2), makeValueRow(v1, v2)); - expectedK1[i] = k1; - expectedK2[i] = k2; - expectedV1[i] = v1; - expectedV2[i] = v2; -} -try { + --- End diff -- Good one --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22399: [SPARK-25408] Move to mode ideomatic Java8
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/22399#discussion_r217036427 --- Diff: common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java --- @@ -133,37 +133,37 @@ private FetchResult fetchBlocks( final Semaphore requestsRemaining = new Semaphore(0); -ExternalShuffleClient client = new ExternalShuffleClient(clientConf, null, false, 5000); -client.init(APP_ID); -client.fetchBlocks(TestUtils.getLocalHost(), port, execId, blockIds, - new BlockFetchingListener() { -@Override -public void onBlockFetchSuccess(String blockId, ManagedBuffer data) { - synchronized (this) { -if (!res.successBlocks.contains(blockId) && !res.failedBlocks.contains(blockId)) { - data.retain(); - res.successBlocks.add(blockId); - res.buffers.add(data); - requestsRemaining.release(); -} - } -} - -@Override -public void onBlockFetchFailure(String blockId, Throwable exception) { - synchronized (this) { -if (!res.successBlocks.contains(blockId) && !res.failedBlocks.contains(blockId)) { - res.failedBlocks.add(blockId); - requestsRemaining.release(); -} - } -} - }, null); - -if (!requestsRemaining.tryAcquire(blockIds.length, 5, TimeUnit.SECONDS)) { - fail("Timeout getting response from the server"); +try(ExternalShuffleClient client = new ExternalShuffleClient(clientConf, null, false, 5000)) { + client.init(APP_ID); + client.fetchBlocks(TestUtils.getLocalHost(), port, execId, blockIds, + new BlockFetchingListener() { --- End diff -- The scenario is something like: later, a line or two in this block is changed. That change is cherry-picked to a branch before this change here. It's a merge conflict, and choosing "theirs" overwrites your change here. It would have to be manually merged. If you know that this is all that has happened, sure, not hard. But it relies on the committer figuring that out and not missing another subtle change during the manual merge. That's definitely the most time-consuming part for me. For Spark 3.x vs 2.x I am less worried, and would like a freer hand to make small improvements. Within a major release I am reluctant. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22399: [SPARK-25408] Move to mode ideomatic Java8
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/22399#discussion_r216968925 --- Diff: sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatchSuite.java --- @@ -321,20 +302,18 @@ public void appendRowUntilExceedingCapacity() throws Exception { Assert.assertTrue(checkValue(value1, 1, 1)); } Assert.assertFalse(iterator.next()); -} finally { - batch.close(); } } @Test public void appendRowUntilExceedingPageSize() throws Exception { // Use default size or spark.buffer.pageSize if specified int pageSizeToUse = (int) memoryManager.pageSizeBytes(); -RowBasedKeyValueBatch batch = RowBasedKeyValueBatch.allocate(keySchema, -valueSchema, taskMemoryManager, pageSizeToUse); //enough capacity -try { - UnsafeRow key = makeKeyRow(1, "A"); - UnsafeRow value = makeValueRow(1, 1); +UnsafeRow key = makeKeyRow(1, "A"); --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22399: [SPARK-25408] Move to mode ideomatic Java8
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/22399#discussion_r216968827 --- Diff: sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatchSuite.java --- @@ -293,18 +276,16 @@ public void fixedLengthTest() throws Exception { Assert.assertTrue(checkKey(key3, 33, 33)); Assert.assertTrue(checkValue(value3, 3, 3)); Assert.assertFalse(iterator.next()); -} finally { - batch.close(); } } @Test public void appendRowUntilExceedingCapacity() throws Exception { -RowBasedKeyValueBatch batch = RowBasedKeyValueBatch.allocate(keySchema, -valueSchema, taskMemoryManager, 10); -try { - UnsafeRow key = makeKeyRow(1, "A"); - UnsafeRow value = makeValueRow(1, 1); +UnsafeRow key = makeKeyRow(1, "A"); --- End diff -- Can we put `key` and `value` inside `try`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22399: [SPARK-25408] Move to mode ideomatic Java8
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/22399#discussion_r216968497 --- Diff: sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatchSuite.java --- @@ -356,49 +335,45 @@ public void appendRowUntilExceedingPageSize() throws Exception { Assert.assertTrue(checkValue(value1, 1, 1)); } Assert.assertFalse(iterator.next()); -} finally { - batch.close(); } } @Test public void failureToAllocateFirstPage() throws Exception { memoryManager.limit(1024); -RowBasedKeyValueBatch batch = RowBasedKeyValueBatch.allocate(keySchema, -valueSchema, taskMemoryManager, DEFAULT_CAPACITY); -try { +try (RowBasedKeyValueBatch batch = RowBasedKeyValueBatch.allocate(keySchema, +valueSchema, taskMemoryManager, DEFAULT_CAPACITY)) { UnsafeRow key = makeKeyRow(1, "A"); UnsafeRow value = makeValueRow(11, 11); UnsafeRow ret = appendRow(batch, key, value); Assert.assertNull(ret); Assert.assertFalse(batch.rowIterator().next()); -} finally { - batch.close(); } } @Test public void randomizedTest() { -RowBasedKeyValueBatch batch = RowBasedKeyValueBatch.allocate(keySchema, -valueSchema, taskMemoryManager, DEFAULT_CAPACITY); -int numEntry = 100; -long[] expectedK1 = new long[numEntry]; -String[] expectedK2 = new String[numEntry]; -long[] expectedV1 = new long[numEntry]; -long[] expectedV2 = new long[numEntry]; - -for (int i = 0; i < numEntry; i++) { - long k1 = rand.nextLong(); - String k2 = getRandomString(rand.nextInt(256)); - long v1 = rand.nextLong(); - long v2 = rand.nextLong(); - appendRow(batch, makeKeyRow(k1, k2), makeValueRow(v1, v2)); - expectedK1[i] = k1; - expectedK2[i] = k2; - expectedV1[i] = v1; - expectedV2[i] = v2; -} -try { + --- End diff -- nit: it would be good to delete this brank line. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22399: [SPARK-25408] Move to mode ideomatic Java8
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/22399#discussion_r216968270 --- Diff: sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/CLIService.java --- @@ -146,16 +146,11 @@ public UserGroupInformation getHttpUGI() { public synchronized void start() { super.start(); // Initialize and test a connection to the metastore -IMetaStoreClient metastoreClient = null; try { - metastoreClient = new HiveMetaStoreClient(hiveConf); - metastoreClient.getDatabases("default"); -} catch (Exception e) { - throw new ServiceException("Unable to connect to MetaStore!", e); -} -finally { - if (metastoreClient != null) { -metastoreClient.close(); + try (IMetaStoreClient metastoreClient = new HiveMetaStoreClient(hiveConf)) { --- End diff -- Do we need `try` at line 149? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22399: [SPARK-25408] Move to mode ideomatic Java8
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/22399#discussion_r216962458 --- Diff: core/src/test/java/test/org/apache/spark/JavaAPISuite.java --- @@ -997,10 +997,10 @@ public void binaryFiles() throws Exception { FileOutputStream fos1 = new FileOutputStream(file1); -FileChannel channel1 = fos1.getChannel(); -ByteBuffer bbuf = ByteBuffer.wrap(content1); -channel1.write(bbuf); -channel1.close(); +try(FileChannel channel1 = fos1.getChannel()) { --- End diff -- nit: space after `try` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22399: [SPARK-25408] Move to mode ideomatic Java8
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/22399#discussion_r216962519 --- Diff: core/src/test/java/test/org/apache/spark/JavaAPISuite.java --- @@ -1018,10 +1018,10 @@ public void binaryFilesCaching() throws Exception { FileOutputStream fos1 = new FileOutputStream(file1); -FileChannel channel1 = fos1.getChannel(); -ByteBuffer bbuf = ByteBuffer.wrap(content1); -channel1.write(bbuf); -channel1.close(); +try(FileChannel channel1 = fos1.getChannel()) { --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22399: [SPARK-25408] Move to mode ideomatic Java8
Github user Fokko commented on a diff in the pull request: https://github.com/apache/spark/pull/22399#discussion_r216962424 --- Diff: core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java --- @@ -39,30 +39,27 @@ public void setUp() throws ClassNotFoundException, SQLException { sc = new JavaSparkContext("local", "JavaAPISuite"); Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); -Connection connection = - DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb;create=true"); -try { - Statement create = connection.createStatement(); - create.execute( -"CREATE TABLE FOO(" + -"ID INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1)," + -"DATA INTEGER)"); - create.close(); +try (Connection connection = DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb;create=true")) { + + try(Statement create = connection.createStatement()) { +create.execute( +"CREATE TABLE FOO(" + +"ID INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1)," + +"DATA INTEGER)"); --- End diff -- This one is horrible yes --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22399: [SPARK-25408] Move to mode ideomatic Java8
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/22399#discussion_r216962247 --- Diff: core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java --- @@ -181,42 +181,43 @@ private void writeSortedFile(boolean isLastFile) { // around this, we pass a dummy no-op serializer. final SerializerInstance ser = DummySerializerInstance.INSTANCE; -final DiskBlockObjectWriter writer = - blockManager.getDiskWriter(blockId, file, ser, fileBufferSizeBytes, writeMetricsToUse); - int currentPartition = -1; -final int uaoSize = UnsafeAlignedOffset.getUaoSize(); -while (sortedRecords.hasNext()) { - sortedRecords.loadNext(); - final int partition = sortedRecords.packedRecordPointer.getPartitionId(); - assert (partition >= currentPartition); - if (partition != currentPartition) { -// Switch to the new partition -if (currentPartition != -1) { - final FileSegment fileSegment = writer.commitAndGet(); - spillInfo.partitionLengths[currentPartition] = fileSegment.length(); +final FileSegment committedSegment; +try (final DiskBlockObjectWriter writer = +blockManager.getDiskWriter(blockId, file, ser, fileBufferSizeBytes, writeMetricsToUse)) { + + final int uaoSize = UnsafeAlignedOffset.getUaoSize(); + while (sortedRecords.hasNext()) { +sortedRecords.loadNext(); +final int partition = sortedRecords.packedRecordPointer.getPartitionId(); +assert (partition >= currentPartition); +if (partition != currentPartition) { + // Switch to the new partition + if (currentPartition != -1) { +final FileSegment fileSegment = writer.commitAndGet(); +spillInfo.partitionLengths[currentPartition] = fileSegment.length(); + } + currentPartition = partition; } -currentPartition = partition; - } - final long recordPointer = sortedRecords.packedRecordPointer.getRecordPointer(); - final Object recordPage = taskMemoryManager.getPage(recordPointer); - final long recordOffsetInPage = taskMemoryManager.getOffsetInPage(recordPointer); - int dataRemaining = UnsafeAlignedOffset.getSize(recordPage, recordOffsetInPage); - long recordReadPosition = recordOffsetInPage + uaoSize; // skip over record length - while (dataRemaining > 0) { -final int toTransfer = Math.min(diskWriteBufferSize, dataRemaining); -Platform.copyMemory( - recordPage, recordReadPosition, writeBuffer, Platform.BYTE_ARRAY_OFFSET, toTransfer); -writer.write(writeBuffer, 0, toTransfer); -recordReadPosition += toTransfer; -dataRemaining -= toTransfer; +final long recordPointer = sortedRecords.packedRecordPointer.getRecordPointer(); +final Object recordPage = taskMemoryManager.getPage(recordPointer); +final long recordOffsetInPage = taskMemoryManager.getOffsetInPage(recordPointer); +int dataRemaining = UnsafeAlignedOffset.getSize(recordPage, recordOffsetInPage); +long recordReadPosition = recordOffsetInPage + uaoSize; // skip over record length +while (dataRemaining > 0) { + final int toTransfer = Math.min(diskWriteBufferSize, dataRemaining); + Platform.copyMemory( + recordPage, recordReadPosition, writeBuffer, Platform.BYTE_ARRAY_OFFSET, toTransfer); --- End diff -- nit: fix indentation --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22399: [SPARK-25408] Move to mode ideomatic Java8
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/22399#discussion_r216960624 --- Diff: common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java --- @@ -98,19 +98,19 @@ public void testSortShuffleBlocks() throws IOException { resolver.registerExecutor("app0", "exec0", dataContext.createExecutorInfo(SORT_MANAGER)); -InputStream block0Stream = - resolver.getBlockData("app0", "exec0", 0, 0, 0).createInputStream(); -String block0 = CharStreams.toString( -new InputStreamReader(block0Stream, StandardCharsets.UTF_8)); -block0Stream.close(); -assertEquals(sortBlock0, block0); - -InputStream block1Stream = - resolver.getBlockData("app0", "exec0", 0, 0, 1).createInputStream(); -String block1 = CharStreams.toString( -new InputStreamReader(block1Stream, StandardCharsets.UTF_8)); -block1Stream.close(); -assertEquals(sortBlock1, block1); +try(InputStream block0Stream = +resolver.getBlockData("app0", "exec0", 0, 0, 0).createInputStream()) { + String block0 = CharStreams.toString( + new InputStreamReader(block0Stream, StandardCharsets.UTF_8)); + assertEquals(sortBlock0, block0); +} + +try(InputStream block1Stream = --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22399: [SPARK-25408] Move to mode ideomatic Java8
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/22399#discussion_r216960499 --- Diff: common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java --- @@ -98,19 +98,19 @@ public void testSortShuffleBlocks() throws IOException { resolver.registerExecutor("app0", "exec0", dataContext.createExecutorInfo(SORT_MANAGER)); -InputStream block0Stream = - resolver.getBlockData("app0", "exec0", 0, 0, 0).createInputStream(); -String block0 = CharStreams.toString( -new InputStreamReader(block0Stream, StandardCharsets.UTF_8)); -block0Stream.close(); -assertEquals(sortBlock0, block0); - -InputStream block1Stream = - resolver.getBlockData("app0", "exec0", 0, 0, 1).createInputStream(); -String block1 = CharStreams.toString( -new InputStreamReader(block1Stream, StandardCharsets.UTF_8)); -block1Stream.close(); -assertEquals(sortBlock1, block1); +try(InputStream block0Stream = --- End diff -- Would it be possible to add a space after `try`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22399: [SPARK-25408] Move to mode ideomatic Java8
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22399#discussion_r216928976 --- Diff: core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java --- @@ -39,30 +39,27 @@ public void setUp() throws ClassNotFoundException, SQLException { sc = new JavaSparkContext("local", "JavaAPISuite"); Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); -Connection connection = - DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb;create=true"); -try { - Statement create = connection.createStatement(); - create.execute( -"CREATE TABLE FOO(" + -"ID INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1)," + -"DATA INTEGER)"); - create.close(); +try (Connection connection = DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb;create=true")) { + + try(Statement create = connection.createStatement()) { +create.execute( +"CREATE TABLE FOO(" + +"ID INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1)," + +"DATA INTEGER)"); --- End diff -- indentation .. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22399: [SPARK-25408] Move to mode ideomatic Java8
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22399#discussion_r216928812 --- Diff: common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java --- @@ -133,37 +133,37 @@ private FetchResult fetchBlocks( final Semaphore requestsRemaining = new Semaphore(0); -ExternalShuffleClient client = new ExternalShuffleClient(clientConf, null, false, 5000); -client.init(APP_ID); -client.fetchBlocks(TestUtils.getLocalHost(), port, execId, blockIds, - new BlockFetchingListener() { -@Override -public void onBlockFetchSuccess(String blockId, ManagedBuffer data) { - synchronized (this) { -if (!res.successBlocks.contains(blockId) && !res.failedBlocks.contains(blockId)) { - data.retain(); - res.successBlocks.add(blockId); - res.buffers.add(data); - requestsRemaining.release(); -} - } -} - -@Override -public void onBlockFetchFailure(String blockId, Throwable exception) { - synchronized (this) { -if (!res.successBlocks.contains(blockId) && !res.failedBlocks.contains(blockId)) { - res.failedBlocks.add(blockId); - requestsRemaining.release(); -} - } -} - }, null); - -if (!requestsRemaining.tryAcquire(blockIds.length, 5, TimeUnit.SECONDS)) { - fail("Timeout getting response from the server"); +try (ExternalShuffleClient client = new ExternalShuffleClient(clientConf, null, false, 5000)) { + client.init(APP_ID); + client.fetchBlocks(TestUtils.getLocalHost(), port, execId, blockIds, + new BlockFetchingListener() { --- End diff -- Shell we stick to 2spaced indentation? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22399: [SPARK-25408] Move to mode ideomatic Java8
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22399#discussion_r216929186 --- Diff: common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java --- @@ -21,6 +21,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; --- End diff -- Looks unused. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22399: [SPARK-25408] Move to mode ideomatic Java8
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22399#discussion_r216928860 --- Diff: common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java --- @@ -96,14 +96,14 @@ private void validate(String appId, String secretKey, boolean encrypt) ImmutableMap.of("spark.authenticate.enableSaslEncryption", "true"))); } -ExternalShuffleClient client = - new ExternalShuffleClient(testConf, new TestSecretKeyHolder(appId, secretKey), true, 5000); -client.init(appId); -// Registration either succeeds or throws an exception. -client.registerWithShuffleServer(TestUtils.getLocalHost(), server.getPort(), "exec0", - new ExecutorShuffleInfo(new String[0], 0, -"org.apache.spark.shuffle.sort.SortShuffleManager")); -client.close(); +try (ExternalShuffleClient client = +new ExternalShuffleClient(testConf, new TestSecretKeyHolder(appId, secretKey), true, 5000)) { + client.init(appId); + // Registration either succeeds or throws an exception. + client.registerWithShuffleServer(TestUtils.getLocalHost(), server.getPort(), "exec0", + new ExecutorShuffleInfo(new String[0], 0, --- End diff -- ditto for indentation --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22399: [SPARK-25408] Move to mode ideomatic Java8
Github user Fokko commented on a diff in the pull request: https://github.com/apache/spark/pull/22399#discussion_r216922969 --- Diff: launcher/src/main/java/org/apache/spark/launcher/AbstractAppHandle.java --- @@ -72,11 +74,7 @@ public void stop() { @Override public synchronized void disconnect() { if (connection != null && connection.isOpen()) { - try { -connection.close(); - } catch (IOException ioe) { -// no-op. - } + IOUtils.closeQuietly(connection); --- End diff -- I wan't aware of this requirement, reverted the change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22399: [SPARK-25408] Move to mode ideomatic Java8
Github user Fokko commented on a diff in the pull request: https://github.com/apache/spark/pull/22399#discussion_r216921874 --- Diff: common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java --- @@ -133,37 +133,37 @@ private FetchResult fetchBlocks( final Semaphore requestsRemaining = new Semaphore(0); -ExternalShuffleClient client = new ExternalShuffleClient(clientConf, null, false, 5000); -client.init(APP_ID); -client.fetchBlocks(TestUtils.getLocalHost(), port, execId, blockIds, - new BlockFetchingListener() { -@Override -public void onBlockFetchSuccess(String blockId, ManagedBuffer data) { - synchronized (this) { -if (!res.successBlocks.contains(blockId) && !res.failedBlocks.contains(blockId)) { - data.retain(); - res.successBlocks.add(blockId); - res.buffers.add(data); - requestsRemaining.release(); -} - } -} - -@Override -public void onBlockFetchFailure(String blockId, Throwable exception) { - synchronized (this) { -if (!res.successBlocks.contains(blockId) && !res.failedBlocks.contains(blockId)) { - res.failedBlocks.add(blockId); - requestsRemaining.release(); -} - } -} - }, null); - -if (!requestsRemaining.tryAcquire(blockIds.length, 5, TimeUnit.SECONDS)) { - fail("Timeout getting response from the server"); +try(ExternalShuffleClient client = new ExternalShuffleClient(clientConf, null, false, 5000)) { + client.init(APP_ID); + client.fetchBlocks(TestUtils.getLocalHost(), port, execId, blockIds, + new BlockFetchingListener() { --- End diff -- If there are merge conflicts, it is easy to pick theirs, and wrap the try-with-resources around it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22399: [SPARK-25408] Move to mode ideomatic Java8
Github user Fokko commented on a diff in the pull request: https://github.com/apache/spark/pull/22399#discussion_r216917780 --- Diff: common/kvstore/src/test/java/org/apache/spark/util/kvstore/DBIteratorSuite.java --- @@ -383,7 +383,7 @@ public void testRefWithIntNaturalKey() throws Exception { LevelDBSuite.IntKeyType i = new LevelDBSuite.IntKeyType(); i.key = 1; i.id = "1"; -i.values = Arrays.asList("1"); +i.values = Collections.singletonList("1"); --- End diff -- I agree, I don't really care about moving to `Collections`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22399: [SPARK-25408] Move to mode ideomatic Java8
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22399#discussion_r216896517 --- Diff: launcher/src/main/java/org/apache/spark/launcher/AbstractAppHandle.java --- @@ -72,11 +74,7 @@ public void stop() { @Override public synchronized void disconnect() { if (connection != null && connection.isOpen()) { - try { -connection.close(); - } catch (IOException ioe) { -// no-op. - } + IOUtils.closeQuietly(connection); --- End diff -- This library should not have any non-JRE dependencies. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22399: [SPARK-25408] Move to mode ideomatic Java8
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/22399#discussion_r216882375 --- Diff: launcher/src/main/java/org/apache/spark/launcher/AbstractAppHandle.java --- @@ -72,11 +74,7 @@ public void stop() { @Override public synchronized void disconnect() { if (connection != null && connection.isOpen()) { - try { -connection.close(); - } catch (IOException ioe) { -// no-op. - } + IOUtils.closeQuietly(connection); --- End diff -- I wouldn't bother with this; we don't really do it consistently, it's not a JDK standard class, and it doesn't really save much, while adding to the dependency on commons/io --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22399: [SPARK-25408] Move to mode ideomatic Java8
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/22399#discussion_r216881904 --- Diff: common/network-common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java --- @@ -143,61 +143,62 @@ public void releaseBuffers() { } private FetchResult fetchChunks(List chunkIndices) throws Exception { -TransportClient client = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort()); -final Semaphore sem = new Semaphore(0); - final FetchResult res = new FetchResult(); -res.successChunks = Collections.synchronizedSet(new HashSet()); -res.failedChunks = Collections.synchronizedSet(new HashSet()); -res.buffers = Collections.synchronizedList(new LinkedList()); -ChunkReceivedCallback callback = new ChunkReceivedCallback() { - @Override - public void onSuccess(int chunkIndex, ManagedBuffer buffer) { -buffer.retain(); -res.successChunks.add(chunkIndex); -res.buffers.add(buffer); -sem.release(); - } +try(TransportClient client = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort())) { --- End diff -- Nit: space after try --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22399: [SPARK-25408] Move to mode ideomatic Java8
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/22399#discussion_r216882060 --- Diff: common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java --- @@ -133,37 +133,37 @@ private FetchResult fetchBlocks( final Semaphore requestsRemaining = new Semaphore(0); -ExternalShuffleClient client = new ExternalShuffleClient(clientConf, null, false, 5000); -client.init(APP_ID); -client.fetchBlocks(TestUtils.getLocalHost(), port, execId, blockIds, - new BlockFetchingListener() { -@Override -public void onBlockFetchSuccess(String blockId, ManagedBuffer data) { - synchronized (this) { -if (!res.successBlocks.contains(blockId) && !res.failedBlocks.contains(blockId)) { - data.retain(); - res.successBlocks.add(blockId); - res.buffers.add(data); - requestsRemaining.release(); -} - } -} - -@Override -public void onBlockFetchFailure(String blockId, Throwable exception) { - synchronized (this) { -if (!res.successBlocks.contains(blockId) && !res.failedBlocks.contains(blockId)) { - res.failedBlocks.add(blockId); - requestsRemaining.release(); -} - } -} - }, null); - -if (!requestsRemaining.tryAcquire(blockIds.length, 5, TimeUnit.SECONDS)) { - fail("Timeout getting response from the server"); +try(ExternalShuffleClient client = new ExternalShuffleClient(clientConf, null, false, 5000)) { + client.init(APP_ID); + client.fetchBlocks(TestUtils.getLocalHost(), port, execId, blockIds, + new BlockFetchingListener() { --- End diff -- Indent is now too deep here. I have the same general kind of doubt here.. it's touching a lot of lines for little actual gain. Still, I'd like to be able to improve code a bit here and there. If this is only going to master and Spark 3, the back-port issue lessens, because it's more unlikely to backport from 3.x to 2.x. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22399: [SPARK-25408] Move to mode ideomatic Java8
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/22399#discussion_r216882288 --- Diff: core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java --- @@ -18,12 +18,7 @@ package org.apache.spark.launcher; import java.time.Duration; -import java.util.Arrays; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; +import java.util.*; --- End diff -- Don't collapse these --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22399: [SPARK-25408] Move to mode ideomatic Java8
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/22399#discussion_r216882035 --- Diff: common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java --- @@ -98,19 +98,19 @@ public void testSortShuffleBlocks() throws IOException { resolver.registerExecutor("app0", "exec0", dataContext.createExecutorInfo(SORT_MANAGER)); -InputStream block0Stream = - resolver.getBlockData("app0", "exec0", 0, 0, 0).createInputStream(); -String block0 = CharStreams.toString( -new InputStreamReader(block0Stream, StandardCharsets.UTF_8)); -block0Stream.close(); -assertEquals(sortBlock0, block0); - -InputStream block1Stream = - resolver.getBlockData("app0", "exec0", 0, 0, 1).createInputStream(); -String block1 = CharStreams.toString( -new InputStreamReader(block1Stream, StandardCharsets.UTF_8)); -block1Stream.close(); -assertEquals(sortBlock1, block1); +try(InputStream block0Stream = + resolver.getBlockData("app0", "exec0", 0, 0, 0).createInputStream()) { --- End diff -- Same comment about space above; I'd also indent the continuation 4 spaces for clarity --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22399: [SPARK-25408] Move to mode ideomatic Java8
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/22399#discussion_r216881870 --- Diff: common/kvstore/src/test/java/org/apache/spark/util/kvstore/DBIteratorSuite.java --- @@ -383,7 +383,7 @@ public void testRefWithIntNaturalKey() throws Exception { LevelDBSuite.IntKeyType i = new LevelDBSuite.IntKeyType(); i.key = 1; i.id = "1"; -i.values = Arrays.asList("1"); +i.values = Collections.singletonList("1"); --- End diff -- I don't think this sort of thing is worth changing. I know, IntelliJ suggests it. Unless the mutability is an issue, I'd leave it as a shorter idiom. I wouldn't use static imports here personally. there's also a very small cost of changes in that they create potential merge conflicts for other changes later, so I tend to have a very low but finite minimum bar for value of code scrubbing like this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22399: [SPARK-25408] Move to mode ideomatic Java8
Github user TomaszGaweda commented on a diff in the pull request: https://github.com/apache/spark/pull/22399#discussion_r216824283 --- Diff: common/kvstore/src/test/java/org/apache/spark/util/kvstore/DBIteratorSuite.java --- @@ -383,7 +383,7 @@ public void testRefWithIntNaturalKey() throws Exception { LevelDBSuite.IntKeyType i = new LevelDBSuite.IntKeyType(); i.key = 1; i.id = "1"; -i.values = Arrays.asList("1"); +i.values = Collections.singletonList("1"); --- End diff -- If you are already changing this, I would also suggest to add static imports to shorten the code --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22399: [SPARK-25408] Move to mode ideomatic Java8
GitHub user Fokko opened a pull request: https://github.com/apache/spark/pull/22399 [SPARK-25408] Move to mode ideomatic Java8 While working on another PR, I noticed that there is quite some legacy Java in there that can be beautified. For example the use og features from Java8, such as: - Collection libraries - Try-with-resource blocks No code has been changed What are your thoughts on this? This makes code easier to read, and using try-with-resource makes is less likely to forget to close something. ## What changes were proposed in this pull request? (Please fill in changes proposed in this fix) ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/Fokko/spark SPARK-25408 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22399.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22399 commit 42050b540ce84063fab5d9e490e50e0eac802e94 Author: Fokko Driesprong Date: 2018-09-11T20:37:33Z [SPARK-25408] Move to mode ideomatic Java8 Use features from Java8 such as: - Collection libraries - Try-with-resource blocks --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org