Re: [PR] [SPARK-45588][SPARK-45640][SQL][TESTS][3.4] Fix flaky ProtobufCatalystDataConversionSuite [spark]
HeartSaVioR closed pull request #43520: [SPARK-45588][SPARK-45640][SQL][TESTS][3.4] Fix flaky ProtobufCatalystDataConversionSuite URL: https://github.com/apache/spark/pull/43520 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45588][SPARK-45640][SQL][TESTS][3.4] Fix flaky ProtobufCatalystDataConversionSuite [spark]
HeartSaVioR commented on PR #43520: URL: https://github.com/apache/spark/pull/43520#issuecomment-1778513431 Thanks! Merging to 3.4. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [INFRA]Uses different `ORACLE_DOCKER_IMAGE_NAME` in the scheduled builds in other branches [spark]
LuciferYang commented on code in PR #43496: URL: https://github.com/apache/spark/pull/43496#discussion_r1371147149 ## .github/workflows/build_branch34.yml: ## @@ -37,7 +37,8 @@ jobs: envs: >- { "SCALA_PROFILE": "scala2.13", - "PYTHON_TO_TEST": "" + "PYTHON_TO_TEST": "", + "ORACLE_DOCKER_IMAGE_NAME": "gvenzl/oracle-xe:21.3.0" Review Comment: I tested it in my own repository, the environment variable did not overwrite successfully, and I haven't found the reason yet. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45533][CORE]Use j.l.r.Cleaner instead of finalize for RocksDBIterator [spark]
LuciferYang commented on code in PR #43502: URL: https://github.com/apache/spark/pull/43502#discussion_r1371102347 ## common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDBIterator.java: ## @@ -272,4 +273,39 @@ static int compare(byte[] a, byte[] b) { return a.length - b.length; } + static class ResourceCleaner implements Runnable { + +private final RocksIterator rocksIterator; + +private final AtomicReference _db; + +private final ConcurrentLinkedQueue>> iteratorTracker; + +public ResourceCleaner(RocksIterator rocksIterator, +AtomicReference _db, Review Comment: Is it feasible to just pass in `RocksDB` and then retrieve `_db` or `iteratorTracker` when using it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45533][CORE]Use j.l.r.Cleaner instead of finalize for RocksDBIterator [spark]
LuciferYang commented on code in PR #43502: URL: https://github.com/apache/spark/pull/43502#discussion_r1371100897 ## common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDBIterator.java: ## @@ -272,4 +273,39 @@ static int compare(byte[] a, byte[] b) { return a.length - b.length; } + static class ResourceCleaner implements Runnable { Review Comment: can be a `private record`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45533][CORE]Use j.l.r.Cleaner instead of finalize for RocksDBIterator [spark]
LuciferYang commented on code in PR #43502: URL: https://github.com/apache/spark/pull/43502#discussion_r1371100897 ## common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDBIterator.java: ## @@ -272,4 +273,39 @@ static int compare(byte[] a, byte[] b) { return a.length - b.length; } + static class ResourceCleaner implements Runnable { Review Comment: can be a `record`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45533][CORE]Use j.l.r.Cleaner instead of finalize for RocksDBIterator [spark]
LuciferYang commented on code in PR #43502: URL: https://github.com/apache/spark/pull/43502#discussion_r1371099819 ## common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDB.java: ## @@ -355,26 +355,21 @@ public void close() throws IOException { } } - /** - * Closes the given iterator if the DB is still open. Trying to close a JNI RocksDB handle - * with a closed DB can cause JVM crashes, so this ensures that situation does not happen. - */ - void closeIterator(RocksDBIterator it) throws IOException { -notifyIteratorClosed(it); -synchronized (this._db) { - org.rocksdb.RocksDB _db = this._db.get(); - if (_db != null) { -it.close(); - } -} + public AtomicReference getRocksDB() { +return _db; } - /** - * Remove iterator from iterator tracker. `RocksDBIterator` calls it to notify - * iterator is closed. - */ - void notifyIteratorClosed(RocksDBIterator it) { -iteratorTracker.removeIf(ref -> it.equals(ref.get())); + public ConcurrentLinkedQueue>> getIteratorTracker() { +return iteratorTracker; + } + + public Optional>> iteratorReference(RocksDBIterator rocksDBIterator) { +for (Reference> rocksDBIteratorReference : iteratorTracker) { + if (rocksDBIterator == rocksDBIteratorReference.get()) { +return Optional.of(rocksDBIteratorReference); Review Comment: It's not necessary to wrap it as Optional, the calling place can also check for non-null :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45533][CORE]Use j.l.r.Cleaner instead of finalize for RocksDBIterator [spark]
LuciferYang commented on code in PR #43502: URL: https://github.com/apache/spark/pull/43502#discussion_r1371089595 ## common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDBIterator.java: ## @@ -50,6 +56,8 @@ class RocksDBIterator implements KVStoreIterator { this.ti = db.getTypeInfo(type); this.index = ti.index(params.index); this.max = params.max; +this.cleanable = CLEANER.register(this, +new RocksDBIterator.ResourceCleaner(it, db.getRocksDB(), db.getIteratorTracker())); Review Comment: Indentation: 2 spaces -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45533][CORE]Use j.l.r.Cleaner instead of finalize for RocksDBIterator [spark]
LuciferYang commented on code in PR #43502: URL: https://github.com/apache/spark/pull/43502#discussion_r1371088961 ## common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDB.java: ## @@ -355,26 +355,21 @@ public void close() throws IOException { } } - /** - * Closes the given iterator if the DB is still open. Trying to close a JNI RocksDB handle - * with a closed DB can cause JVM crashes, so this ensures that situation does not happen. - */ - void closeIterator(RocksDBIterator it) throws IOException { -notifyIteratorClosed(it); -synchronized (this._db) { - org.rocksdb.RocksDB _db = this._db.get(); - if (_db != null) { -it.close(); - } -} + public AtomicReference getRocksDB() { +return _db; } - /** - * Remove iterator from iterator tracker. `RocksDBIterator` calls it to notify - * iterator is closed. - */ - void notifyIteratorClosed(RocksDBIterator it) { -iteratorTracker.removeIf(ref -> it.equals(ref.get())); + public ConcurrentLinkedQueue>> getIteratorTracker() { +return iteratorTracker; + } + + public Optional>> iteratorReference(RocksDBIterator rocksDBIterator) { Review Comment: The method name is something I came up with on the spot, there might be a more suitable one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45533][CORE]Use j.l.r.Cleaner instead of finalize for RocksDBIterator [spark]
LuciferYang commented on code in PR #43502: URL: https://github.com/apache/spark/pull/43502#discussion_r1371086639 ## common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDB.java: ## @@ -355,26 +355,21 @@ public void close() throws IOException { } } - /** - * Closes the given iterator if the DB is still open. Trying to close a JNI RocksDB handle - * with a closed DB can cause JVM crashes, so this ensures that situation does not happen. - */ - void closeIterator(RocksDBIterator it) throws IOException { -notifyIteratorClosed(it); -synchronized (this._db) { - org.rocksdb.RocksDB _db = this._db.get(); - if (_db != null) { -it.close(); - } -} + public AtomicReference getRocksDB() { +return _db; } - /** - * Remove iterator from iterator tracker. `RocksDBIterator` calls it to notify - * iterator is closed. - */ - void notifyIteratorClosed(RocksDBIterator it) { -iteratorTracker.removeIf(ref -> it.equals(ref.get())); + public ConcurrentLinkedQueue>> getIteratorTracker() { +return iteratorTracker; + } + + public Optional>> iteratorReference(RocksDBIterator rocksDBIterator) { Review Comment: This method should add `@VisibleForTesting` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45533][CORE]Use j.l.r.Cleaner instead of finalize for RocksDBIterator [spark]
LuciferYang commented on code in PR #43502: URL: https://github.com/apache/spark/pull/43502#discussion_r1371086370 ## common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDB.java: ## @@ -355,26 +355,21 @@ public void close() throws IOException { } } - /** - * Closes the given iterator if the DB is still open. Trying to close a JNI RocksDB handle - * with a closed DB can cause JVM crashes, so this ensures that situation does not happen. - */ - void closeIterator(RocksDBIterator it) throws IOException { -notifyIteratorClosed(it); -synchronized (this._db) { - org.rocksdb.RocksDB _db = this._db.get(); - if (_db != null) { -it.close(); - } -} + public AtomicReference getRocksDB() { Review Comment: please remove `public` ## common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDB.java: ## @@ -355,26 +355,21 @@ public void close() throws IOException { } } - /** - * Closes the given iterator if the DB is still open. Trying to close a JNI RocksDB handle - * with a closed DB can cause JVM crashes, so this ensures that situation does not happen. - */ - void closeIterator(RocksDBIterator it) throws IOException { -notifyIteratorClosed(it); -synchronized (this._db) { - org.rocksdb.RocksDB _db = this._db.get(); - if (_db != null) { -it.close(); - } -} + public AtomicReference getRocksDB() { +return _db; } - /** - * Remove iterator from iterator tracker. `RocksDBIterator` calls it to notify - * iterator is closed. - */ - void notifyIteratorClosed(RocksDBIterator it) { -iteratorTracker.removeIf(ref -> it.equals(ref.get())); + public ConcurrentLinkedQueue>> getIteratorTracker() { Review Comment: ditto ## common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDB.java: ## @@ -355,26 +355,21 @@ public void close() throws IOException { } } - /** - * Closes the given iterator if the DB is still open. Trying to close a JNI RocksDB handle - * with a closed DB can cause JVM crashes, so this ensures that situation does not happen. - */ - void closeIterator(RocksDBIterator it) throws IOException { -notifyIteratorClosed(it); -synchronized (this._db) { - org.rocksdb.RocksDB _db = this._db.get(); - if (_db != null) { -it.close(); - } -} + public AtomicReference getRocksDB() { +return _db; } - /** - * Remove iterator from iterator tracker. `RocksDBIterator` calls it to notify - * iterator is closed. - */ - void notifyIteratorClosed(RocksDBIterator it) { -iteratorTracker.removeIf(ref -> it.equals(ref.get())); + public ConcurrentLinkedQueue>> getIteratorTracker() { +return iteratorTracker; + } + + public Optional>> iteratorReference(RocksDBIterator rocksDBIterator) { Review Comment: ditto -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45533][CORE]Use j.l.r.Cleaner instead of finalize for RocksDBIterator [spark]
LuciferYang commented on code in PR #43502: URL: https://github.com/apache/spark/pull/43502#discussion_r1371085761 ## common/kvstore/src/test/java/org/apache/spark/util/kvstore/RocksDBIteratorSuite.java: ## @@ -45,4 +51,37 @@ protected KVStore createStore() throws Exception { return db; } + @Test Review Comment: I think it should also be feasible to put it into `RocksDBSuite`, and there's no need to redefine `CustomType1`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45533][CORE]Use j.l.r.Cleaner instead of finalize for RocksDBIterator [spark]
LuciferYang commented on code in PR #43502: URL: https://github.com/apache/spark/pull/43502#discussion_r1371084374 ## common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDBIterator.java: ## @@ -272,4 +273,39 @@ static int compare(byte[] a, byte[] b) { return a.length - b.length; } + static class ResourceCleaner implements Runnable { + +private final RocksIterator rocksIterator; + +private final AtomicReference _db; + +private final ConcurrentLinkedQueue>> iteratorTracker; + +public ResourceCleaner(RocksIterator rocksIterator, +AtomicReference _db, +ConcurrentLinkedQueue>> iteratorTracker) { + this.rocksIterator = rocksIterator; + this._db = _db; + this.iteratorTracker = iteratorTracker; +} + +@Override +public void run() { + iteratorTracker.removeIf(ref -> { +RocksDBIterator rocksDBIterator = ref.get(); +if (rocksDBIterator != null && rocksIterator.equals(rocksDBIterator.it)) { Review Comment: ``` if (rocksDBIterator != null && rocksIterator.equals(rocksDBIterator.it)) { return true; } else { return false; } ``` -> ``` return rocksDBIterator != null && rocksIterator.equals(rocksDBIterator.it); ``` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45533][CORE]Use j.l.r.Cleaner instead of finalize for RocksDBIterator [spark]
LuciferYang commented on code in PR #43502: URL: https://github.com/apache/spark/pull/43502#discussion_r1371083658 ## common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDBIterator.java: ## @@ -50,6 +56,8 @@ class RocksDBIterator implements KVStoreIterator { this.ti = db.getTypeInfo(type); this.index = ti.index(params.index); this.max = params.max; +this.cleanable = CLEANER.register(this, +new RocksDBIterator.ResourceCleaner(it, db.getRocksDB(), db.getIteratorTracker())); Review Comment: Could we just pass in `db`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45533][CORE]Use j.l.r.Cleaner instead of finalize for RocksDBIterator [spark]
zhaomin1423 commented on code in PR #43502: URL: https://github.com/apache/spark/pull/43502#discussion_r1371083522 ## common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDBIterator.java: ## @@ -272,4 +269,39 @@ static int compare(byte[] a, byte[] b) { return a.length - b.length; } + static class ResourceCleaner implements Runnable { + +private final RocksIterator rocksIterator; + +private final AtomicReference _db; + +private final ConcurrentLinkedQueue>> iteratorTracker; + +public ResourceCleaner(RocksIterator rocksIterator, +AtomicReference _db, +ConcurrentLinkedQueue>> iteratorTracker) { + this.rocksIterator = rocksIterator; + this._db = _db; + this.iteratorTracker = iteratorTracker; +} + +@Override +public void run() { + synchronized (this._db) { +org.rocksdb.RocksDB _db = this._db.get(); +if (_db == null) { + return; +} +rocksIterator.close(); + } + iteratorTracker.removeIf(ref -> { + RocksDBIterator rocksDBIterator = ref.get(); + if (rocksDBIterator != null && rocksIterator.equals(rocksDBIterator.it)) { +return true; + } else { +return false; + } +}); Review Comment: thanks, fix -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45533][CORE]Use j.l.r.Cleaner instead of finalize for RocksDBIterator [spark]
LuciferYang commented on PR #43502: URL: https://github.com/apache/spark/pull/43502#issuecomment-1778424888 Let's add a test case first. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45533][CORE]Use j.l.r.Cleaner instead of finalize for RocksDBIterator [spark]
zhaomin1423 commented on PR #43502: URL: https://github.com/apache/spark/pull/43502#issuecomment-1778423067 > @zhaomin1423 > > Firstly, we may need to add two helper methods: > > 1. Add a method in `RocksDB` that can get its corresponding `WeakReference` through `RocksDBIterator`, here I assume it's called `iteratorReference` > 2. Add a method in `RocksDBIterator` to get `RocksIterator`, here I assume it's called `internalIterator` > > Then, the test case may like the following: > > ```java > @Test > public void testResourceCleaner() throws Exception { > File dbPathForCleanerTest = File.createTempFile( > "test_db_cleaner.", ".rdb"); > dbPathForCleanerTest.delete(); > > RocksDB dbForCleanerTest = new RocksDB(dbPathForCleanerTest); > for (int i = 0; i < 8192; i++) { > dbForCleanerTest.write(createCustomType1(i)); > } > > RocksDBIterator rocksDBIterator = (RocksDBIterator) dbForCleanerTest.view(CustomType1.class).iterator(); > Optional>> referenceOpt = dbForCleanerTest.iteratorReference(rocksDBIterator); > assertTrue(referenceOpt.isPresent()); > RocksIterator it = rocksDBIterator.internalIterator(); > assertTrue(it.isOwningHandle()); // it has not been closed yet, isOwningHandle should be true. > rocksDBIterator = null; // Manually set rocksDBIterator to null, to be GC. > Reference> ref = referenceOpt.get(); > // 100 times gc, the rocksDBIterator should be GCed. > int count = 0; > while (count < 100 && !ref.refersTo(null)) { > System.gc(); > count++; > Thread.sleep(100); > } > assertTrue(ref.refersTo(null)); // check rocksDBIterator should be GCed > // Verify that the Cleaner will be executed after a period of time, and it.isOwningHandle() will become false. > assertTimeout(java.time.Duration.ofSeconds(5), () -> assertFalse(it.isOwningHandle())); > > dbForCleanerTest.close(); > } > ``` > > Of course, we can also add a state in `ResourceCleaner` to indicate whether it has been executed, initially set to false, and turns true after execution. These are just some of my thoughts, there might be better ways to test. Adding a state is simpler and works for different classes, but I don't know if adding variables for testing is a common approach. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45545][CORE] Pass SSLOptions wherever we create a SparkTransportConf [spark]
hasnain-db commented on PR #43387: URL: https://github.com/apache/spark/pull/43387#issuecomment-1778412037 CI looks green. thank you for reviewing @mridulm ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45533][CORE]Use j.l.r.Cleaner instead of finalize for RocksDBIterator [spark]
LuciferYang commented on code in PR #43502: URL: https://github.com/apache/spark/pull/43502#discussion_r1371068189 ## common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDBIterator.java: ## @@ -272,4 +269,39 @@ static int compare(byte[] a, byte[] b) { return a.length - b.length; } + static class ResourceCleaner implements Runnable { + +private final RocksIterator rocksIterator; + +private final AtomicReference _db; + +private final ConcurrentLinkedQueue>> iteratorTracker; + +public ResourceCleaner(RocksIterator rocksIterator, +AtomicReference _db, +ConcurrentLinkedQueue>> iteratorTracker) { + this.rocksIterator = rocksIterator; + this._db = _db; + this.iteratorTracker = iteratorTracker; +} + +@Override +public void run() { + synchronized (this._db) { +org.rocksdb.RocksDB _db = this._db.get(); +if (_db == null) { + return; +} +rocksIterator.close(); + } + iteratorTracker.removeIf(ref -> { + RocksDBIterator rocksDBIterator = ref.get(); + if (rocksDBIterator != null && rocksIterator.equals(rocksDBIterator.it)) { +return true; + } else { +return false; + } +}); Review Comment: +1, however, if I understand correctly, there should be no corresponding ref in iteratorTracker at this point. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
beliefer commented on PR #42398: URL: https://github.com/apache/spark/pull/42398#issuecomment-1778406651 cc @cloud-fan @MaxGekk I'm okay with most of this PR. Please help me to review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45533][CORE]Use j.l.r.Cleaner instead of finalize for RocksDBIterator [spark]
LuciferYang commented on code in PR #43502: URL: https://github.com/apache/spark/pull/43502#discussion_r1371068189 ## common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDBIterator.java: ## @@ -272,4 +269,39 @@ static int compare(byte[] a, byte[] b) { return a.length - b.length; } + static class ResourceCleaner implements Runnable { + +private final RocksIterator rocksIterator; + +private final AtomicReference _db; + +private final ConcurrentLinkedQueue>> iteratorTracker; + +public ResourceCleaner(RocksIterator rocksIterator, +AtomicReference _db, +ConcurrentLinkedQueue>> iteratorTracker) { + this.rocksIterator = rocksIterator; + this._db = _db; + this.iteratorTracker = iteratorTracker; +} + +@Override +public void run() { + synchronized (this._db) { +org.rocksdb.RocksDB _db = this._db.get(); +if (_db == null) { + return; +} +rocksIterator.close(); + } + iteratorTracker.removeIf(ref -> { + RocksDBIterator rocksDBIterator = ref.get(); + if (rocksDBIterator != null && rocksIterator.equals(rocksDBIterator.it)) { +return true; + } else { +return false; + } +}); Review Comment: +1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
beliefer commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1371068875 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala: ## @@ -245,3 +249,115 @@ case class CollectTopK( override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): CollectTopK = copy(inputAggBufferOffset = newInputAggBufferOffset) } + +@ExpressionDescription( + usage = "_FUNC_(expr) - Returns the concatenated input values," + +" separated by the delimiter string.", + examples = """ +Examples: + > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col); + a,b,c + > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col); + a,a + > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col); + a|b + > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col); + + """, + group = "agg_funcs", + since = "4.0.0") +case class ListAgg( +child: Expression, +delimiter: Expression, +orderExpression: Expression, +reverse: Boolean = false, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]] + with BinaryLike[Expression] { + + def this(child: Expression) = +this(child, Literal.create(",", StringType), child, false, 0, 0) + def this(child: Expression, delimiter: Expression) = +this(child, delimiter, child, false, 0, 0) + + override def nullable: Boolean = false + + override def dataType: DataType = StringType + + override def left: Expression = child + + override def right: Expression = orderExpression + + override def createAggregationBuffer(): mutable.ArrayBuffer[Any] = mutable.ArrayBuffer.empty + + override def withNewMutableAggBufferOffset( + newMutableAggBufferOffset: Int): ImperativeAggregate = +copy(mutableAggBufferOffset = newMutableAggBufferOffset) + + override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): ImperativeAggregate = +copy(inputAggBufferOffset = newInputAggBufferOffset) + + private lazy val sameExpression = orderExpression.semanticEquals(child) + + override protected def convertToBufferElement(value: Any): Any = InternalRow.copyValue(value) + override def defaultResult: Option[Literal] = Option(Literal.create("", StringType)) + + override protected lazy val bufferElementType: DataType = { +if (sameExpression) { + child.dataType +} else { + StructType(Seq( +StructField("value", child.dataType), +StructField("sortOrder", orderExpression.dataType))) +} + } + + override def eval(buffer: mutable.ArrayBuffer[Any]): Any = { +if (buffer.nonEmpty) { + val ordering = PhysicalDataType.ordering(orderExpression.dataType) + lazy val sortFunc = (sameExpression, reverse) match { +case (true, true) => (buffer: mutable.ArrayBuffer[Any]) => + buffer.sorted(ordering.reverse) +case (true, false) => (buffer: mutable.ArrayBuffer[Any]) => + buffer.sorted(ordering) +case (false, true) => (buffer: mutable.ArrayBuffer[Any]) => + buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].sortBy(_.get(1, + orderExpression.dataType))(ordering.asInstanceOf[Ordering[AnyRef]].reverse).map(_.get(0, +child.dataType)) +case (false, false) => (buffer: mutable.ArrayBuffer[Any]) => + buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].sortBy(_.get(1, + orderExpression.dataType))(ordering.asInstanceOf[Ordering[AnyRef]]).map(_.get(0, +child.dataType)) + } + val sorted = sortFunc(buffer) + UTF8String.fromString(sorted.map(_.toString) +.mkString(delimiter.eval().asInstanceOf[UTF8String].toString)) +} else { + UTF8String.fromString("") +} + } + + override def update(buffer: ArrayBuffer[Any], input: InternalRow): ArrayBuffer[Any] = { +val value = child.eval(input) +if (value != null) { + val v = if (sameExpression) { +convertToBufferElement(value) + } else { +InternalRow.apply(convertToBufferElement(value), + convertToBufferElement(orderExpression.eval(input))) Review Comment: cc @cloud-fan @MaxGekk -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45640][SQL][TESTS] Fix flaky ProtobufCatalystDataConversionSuite [spark]
panbingkun commented on PR #43493: URL: https://github.com/apache/spark/pull/43493#issuecomment-1778403933 > This has to go to 3.5/3.4 as well. > > @panbingkun Would you mind filing PRs for backporting this to 3.5 and 3.4? Separate PRs would be preferred. Please also include the change in this PR #43424 - only need to incorporate the change in connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala > > Thanks in advance! Done. Branch-3.4: https://github.com/apache/spark/pull/43520 Branch-3.5: https://github.com/apache/spark/pull/43521 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-45588][SPARK-45640][SQL][TESTS][3.5] Fix flaky ProtobufCatalystDataConversionSuite [spark]
panbingkun opened a new pull request, #43521: URL: https://github.com/apache/spark/pull/43521 ### What changes were proposed in this pull request? The pr aims to fix flaky ProtobufCatalystDataConversionSuite, include: - Fix the type check (when the random value was empty array, we didn't skip it. Original intention is to skip default values for types.) [SPARK-45588] - When data.get(0) is null, data.get(0).asInstanceOf[Array[Byte]].isEmpty will be thrown java.lang.NullPointerException. [SPARK-45640] Backport above to branch 3.5. Master branch pr: https://github.com/apache/spark/pull/43424 & https://github.com/apache/spark/pull/43493 ### Why are the changes needed? Fix flaky ProtobufCatalystDataConversionSuite. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - Pass GA - Manually test ### Was this patch authored or co-authored using generative AI tooling? No. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-45588][SPARK-45640][SQL][TESTS][3.4] Fix flaky ProtobufCatalystDataConversionSuite [spark]
panbingkun opened a new pull request, #43520: URL: https://github.com/apache/spark/pull/43520 ### What changes were proposed in this pull request? The pr aims to fix flaky ProtobufCatalystDataConversionSuite, include: - Fix the type check (when the random value was empty array, we didn't skip it. Original intention is to skip default values for types.) [SPARK-45588] - When data.get(0) is null, data.get(0).asInstanceOf[Array[Byte]].isEmpty will be thrown java.lang.NullPointerException. [SPARK-45640] backport above to branch 3.4. Master branch pr: https://github.com/apache/spark/pull/43424 & https://github.com/apache/spark/pull/43493 ### Why are the changes needed? Fix flaky ProtobufCatalystDataConversionSuite. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - Pass GA - Manually test ### Was this patch authored or co-authored using generative AI tooling? No. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
Hisoka-X commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1371026410 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala: ## @@ -245,3 +249,115 @@ case class CollectTopK( override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): CollectTopK = copy(inputAggBufferOffset = newInputAggBufferOffset) } + +@ExpressionDescription( + usage = "_FUNC_(expr) - Returns the concatenated input values," + +" separated by the delimiter string.", + examples = """ +Examples: + > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col); + a,b,c + > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col); + a,a + > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col); + a|b + > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col); + + """, + group = "agg_funcs", + since = "4.0.0") +case class ListAgg( +child: Expression, +delimiter: Expression, +orderExpression: Expression, +reverse: Boolean = false, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]] + with BinaryLike[Expression] { + + def this(child: Expression) = +this(child, Literal.create(",", StringType), child, false, 0, 0) + def this(child: Expression, delimiter: Expression) = +this(child, delimiter, child, false, 0, 0) + + override def nullable: Boolean = false + + override def dataType: DataType = StringType + + override def left: Expression = child + + override def right: Expression = orderExpression + + override def createAggregationBuffer(): mutable.ArrayBuffer[Any] = mutable.ArrayBuffer.empty + + override def withNewMutableAggBufferOffset( + newMutableAggBufferOffset: Int): ImperativeAggregate = +copy(mutableAggBufferOffset = newMutableAggBufferOffset) + + override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): ImperativeAggregate = +copy(inputAggBufferOffset = newInputAggBufferOffset) + + private lazy val sameExpression = orderExpression.semanticEquals(child) + + override protected def convertToBufferElement(value: Any): Any = InternalRow.copyValue(value) + override def defaultResult: Option[Literal] = Option(Literal.create("", StringType)) + + override protected lazy val bufferElementType: DataType = { +if (sameExpression) { + child.dataType +} else { + StructType(Seq( +StructField("value", child.dataType), +StructField("sortOrder", orderExpression.dataType))) +} + } + + override def eval(buffer: mutable.ArrayBuffer[Any]): Any = { +if (buffer.nonEmpty) { + val ordering = PhysicalDataType.ordering(orderExpression.dataType) + lazy val sortFunc = (sameExpression, reverse) match { +case (true, true) => (buffer: mutable.ArrayBuffer[Any]) => + buffer.sorted(ordering.reverse) +case (true, false) => (buffer: mutable.ArrayBuffer[Any]) => + buffer.sorted(ordering) +case (false, true) => (buffer: mutable.ArrayBuffer[Any]) => + buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].sortBy(_.get(1, + orderExpression.dataType))(ordering.asInstanceOf[Ordering[AnyRef]].reverse).map(_.get(0, +child.dataType)) +case (false, false) => (buffer: mutable.ArrayBuffer[Any]) => + buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].sortBy(_.get(1, + orderExpression.dataType))(ordering.asInstanceOf[Ordering[AnyRef]]).map(_.get(0, +child.dataType)) + } + val sorted = sortFunc(buffer) + UTF8String.fromString(sorted.map(_.toString) +.mkString(delimiter.eval().asInstanceOf[UTF8String].toString)) +} else { + UTF8String.fromString("") +} + } + + override def update(buffer: ArrayBuffer[Any], input: InternalRow): ArrayBuffer[Any] = { +val value = child.eval(input) +if (value != null) { + val v = if (sameExpression) { +convertToBufferElement(value) + } else { +InternalRow.apply(convertToBufferElement(value), + convertToBufferElement(orderExpression.eval(input))) Review Comment: This one can not be changed, we must save value into buffer after execute `InternalRow.copyValue`. Please refer https://github.com/apache/spark/pull/42398#discussion_r1349839317 . Without this, the value in buffer alway have same value of last one row. ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala: ## @@ -245,3 +249,115 @@ case class CollectTopK( override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): CollectTopK = copy(inputAggBufferOffset =
Re: [PR] [SPARK-45533][CORE]Use j.l.r.Cleaner instead of finalize for RocksDBIterator [spark]
beliefer commented on code in PR #43502: URL: https://github.com/apache/spark/pull/43502#discussion_r1371011690 ## common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDBIterator.java: ## @@ -272,4 +269,39 @@ static int compare(byte[] a, byte[] b) { return a.length - b.length; } + static class ResourceCleaner implements Runnable { + +private final RocksIterator rocksIterator; + +private final AtomicReference _db; + +private final ConcurrentLinkedQueue>> iteratorTracker; + +public ResourceCleaner(RocksIterator rocksIterator, +AtomicReference _db, +ConcurrentLinkedQueue>> iteratorTracker) { + this.rocksIterator = rocksIterator; + this._db = _db; + this.iteratorTracker = iteratorTracker; +} + +@Override +public void run() { + synchronized (this._db) { +org.rocksdb.RocksDB _db = this._db.get(); +if (_db == null) { + return; +} +rocksIterator.close(); + } + iteratorTracker.removeIf(ref -> { + RocksDBIterator rocksDBIterator = ref.get(); + if (rocksDBIterator != null && rocksIterator.equals(rocksDBIterator.it)) { +return true; + } else { +return false; + } +}); Review Comment: It seems we should remove reference for iteratorTracker first, then close rocksIterator? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
beliefer commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1370996202 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala: ## @@ -478,6 +478,7 @@ object FunctionRegistry { expression[Percentile]("percentile"), expression[Median]("median"), expression[Skewness]("skewness"), +expression[ListAgg]("listagg"), Review Comment: Because `ListAgg` extends `Collect` now, please put it with `CollectList` together. ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala: ## @@ -245,3 +249,115 @@ case class CollectTopK( override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): CollectTopK = copy(inputAggBufferOffset = newInputAggBufferOffset) } + +@ExpressionDescription( + usage = "_FUNC_(expr) - Returns the concatenated input values," + +" separated by the delimiter string.", + examples = """ +Examples: + > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col); + a,b,c + > SELECT _FUNC_(col) FROM VALUES (NULL), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col); + a,a + > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col); + a,b + > SELECT _FUNC_(col, '|') FROM VALUES ('a'), ('b') AS tab(col); + a|b + > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col); + + """, + group = "agg_funcs", + since = "4.0.0") +case class ListAgg( +child: Expression, +delimiter: Expression, +orderExpression: Expression, +reverse: Boolean = false, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) extends Collect[mutable.ArrayBuffer[Any]] + with BinaryLike[Expression] { + + def this(child: Expression) = +this(child, Literal.create(",", StringType), child, false, 0, 0) + def this(child: Expression, delimiter: Expression) = +this(child, delimiter, child, false, 0, 0) + + override def nullable: Boolean = false + + override def dataType: DataType = StringType + + override def left: Expression = child + + override def right: Expression = orderExpression + + override def createAggregationBuffer(): mutable.ArrayBuffer[Any] = mutable.ArrayBuffer.empty + + override def withNewMutableAggBufferOffset( + newMutableAggBufferOffset: Int): ImperativeAggregate = +copy(mutableAggBufferOffset = newMutableAggBufferOffset) + + override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): ImperativeAggregate = +copy(inputAggBufferOffset = newInputAggBufferOffset) + + private lazy val sameExpression = orderExpression.semanticEquals(child) + + override protected def convertToBufferElement(value: Any): Any = InternalRow.copyValue(value) + override def defaultResult: Option[Literal] = Option(Literal.create("", StringType)) + + override protected lazy val bufferElementType: DataType = { +if (sameExpression) { + child.dataType +} else { + StructType(Seq( +StructField("value", child.dataType), +StructField("sortOrder", orderExpression.dataType))) +} + } + + override def eval(buffer: mutable.ArrayBuffer[Any]): Any = { +if (buffer.nonEmpty) { + val ordering = PhysicalDataType.ordering(orderExpression.dataType) + lazy val sortFunc = (sameExpression, reverse) match { +case (true, true) => (buffer: mutable.ArrayBuffer[Any]) => + buffer.sorted(ordering.reverse) +case (true, false) => (buffer: mutable.ArrayBuffer[Any]) => + buffer.sorted(ordering) +case (false, true) => (buffer: mutable.ArrayBuffer[Any]) => + buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].sortBy(_.get(1, + orderExpression.dataType))(ordering.asInstanceOf[Ordering[AnyRef]].reverse).map(_.get(0, +child.dataType)) +case (false, false) => (buffer: mutable.ArrayBuffer[Any]) => + buffer.asInstanceOf[mutable.ArrayBuffer[InternalRow]].sortBy(_.get(1, + orderExpression.dataType))(ordering.asInstanceOf[Ordering[AnyRef]]).map(_.get(0, +child.dataType)) + } + val sorted = sortFunc(buffer) + UTF8String.fromString(sorted.map(_.toString) +.mkString(delimiter.eval().asInstanceOf[UTF8String].toString)) +} else { + UTF8String.fromString("") +} + } + + override def update(buffer: ArrayBuffer[Any], input: InternalRow): ArrayBuffer[Any] = { +val value = child.eval(input) +if (value != null) { + val v = if (sameExpression) { +convertToBufferElement(value) + } else { +InternalRow.apply(convertToBufferElement(value), + convertToBufferElement(orderExpression.eval(input))) Review Comment: `InternalRow.apply(value, orderExpression.eval(input))` ##
Re: [PR] [SPARK-45640][SQL][TESTS] Fix flaky ProtobufCatalystDataConversionSuite [spark]
panbingkun commented on PR #43493: URL: https://github.com/apache/spark/pull/43493#issuecomment-1778340888 > Would you mind filing PRs for backporting this to 3.5 and 3.4? Separate PRs would be preferred. Please also include the change in this PR #43424 - only need to incorporate the change in connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala > > Thanks in advance! Let me do it right away. Thank for all reviewing and help testing @rangadi @HeartSaVioR @MaxGekk @HyukjinKwon @LuciferYang -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
beliefer commented on code in PR #42398: URL: https://github.com/apache/spark/pull/42398#discussion_r1369777027 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala: ## @@ -36,8 +39,7 @@ import org.apache.spark.util.BoundedPriorityQueue * We have to store all the collected elements in memory, and so notice that too many elements * can cause GC paused and eventually OutOfMemory Errors. */ -abstract class Collect[T <: Growable[Any] with Iterable[Any]] extends TypedImperativeAggregate[T] - with UnaryLike[Expression] { +abstract class Collect[T <: Growable[Any] with Iterable[Any]] extends TypedImperativeAggregate[T] { Review Comment: Change `Collect` is looks good now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45545][CORE] Pass SSLOptions wherever we create a SparkTransportConf [spark]
mridulm commented on code in PR #43387: URL: https://github.com/apache/spark/pull/43387#discussion_r1370995264 ## core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala: ## @@ -38,11 +38,20 @@ import org.apache.spark.internal.config.Network import org.apache.spark.network.{BlockDataManager, BlockTransferService} import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer} import org.apache.spark.network.shuffle.BlockFetchingListener +import org.apache.spark.network.ssl.SslSampleConfigs import org.apache.spark.serializer.{JavaSerializer, SerializerManager} import org.apache.spark.storage.{BlockId, ShuffleBlockId} import org.apache.spark.util.ThreadUtils class NettyBlockTransferSecuritySuite extends SparkFunSuite with MockitoSugar with Matchers { + + private def sparkConfWithSsl(): SparkConf = { +val conf = new SparkConf() +val updatedConfigs = SslSampleConfigs.createDefaultConfigMap() +updatedConfigs.entrySet().forEach(entry => conf.set(entry.getKey, entry.getValue)) Review Comment: Sounds good -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-45656][SQL] Fix observation when named observations with the same name on different datasets [spark]
ueshin opened a new pull request, #43519: URL: https://github.com/apache/spark/pull/43519 ### What changes were proposed in this pull request? Fixes observation when named observations with the same name on different datasets. ### Why are the changes needed? Currently if there are observations with the same name on different dataset, one of them will be overwritten by the other execution. For example, ```py >>> observation1 = Observation("named") >>> df1 = spark.range(50) >>> observed_df1 = df1.observe(observation1, count(lit(1)).alias("cnt")) >>> >>> observation2 = Observation("named") >>> df2 = spark.range(100) >>> observed_df2 = df2.observe(observation2, count(lit(1)).alias("cnt")) >>> >>> observed_df1.collect() ... >>> observed_df2.collect() ... >>> observation1.get {'cnt': 50} >>> observation2.get {'cnt': 50} ``` `observation2` should return `{'cnt': 100}`. ### Does this PR introduce _any_ user-facing change? Yes, the observations with the same name will be available if they observe different datasets. ### How was this patch tested? Added the related tests. ### Was this patch authored or co-authored using generative AI tooling? No. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-44443][SQL] Use PartitionEvaluator API in CoGroupExec, DeserializeToObjectExec, ExternalRDDScanExec [spark]
github-actions[bot] closed pull request #42017: [SPARK-3][SQL] Use PartitionEvaluator API in CoGroupExec, DeserializeToObjectExec, ExternalRDDScanExec URL: https://github.com/apache/spark/pull/42017 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45648][INFRA] Add `sql/api` and `common/utils` to `modules.py` [spark]
HyukjinKwon closed pull request #43501: [SPARK-45648][INFRA] Add `sql/api` and `common/utils` to `modules.py` URL: https://github.com/apache/spark/pull/43501 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45648][INFRA] Add `sql/api` and `common/utils` to `modules.py` [spark]
HyukjinKwon commented on PR #43501: URL: https://github.com/apache/spark/pull/43501#issuecomment-1778263671 Merged to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45655][SS] Mark CurrentBatchTimestamp as deterministic [spark]
HeartSaVioR commented on code in PR #43517: URL: https://github.com/apache/spark/pull/43517#discussion_r1370931246 ## sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala: ## @@ -354,6 +357,47 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually { ) } + test("SPARK-45655: test current batch timestamp in streaming query metrics") { +import testImplicits._ + +val inputData = MemoryStream[Timestamp] + +val query = inputData.toDF() + .filter("value < current_date()") + .observe("metrics", count(expr("value >= current_date()")).alias("dropped")) + .writeStream + .queryName("ts_metrics_test") + .format("memory") + .outputMode("append") + .start() + +val timeNow = Instant.now() + +// this value would be accepted by the filter and would not count towards +// dropped metrics. +val validValue = Timestamp.from(timeNow.minus(2, ChronoUnit.DAYS)) +inputData.addData(validValue) + +// would be dropped by the filter and count towards dropped metrics +inputData.addData(Timestamp.from(timeNow.plus(2, ChronoUnit.DAYS))) + +query.processAllAvailable() + +val dropped = query.recentProgress.map(p => { Review Comment: nit: style, you can omit ( and ) via changing this to `{ p =>` ## sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala: ## @@ -354,6 +357,47 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually { ) } + test("SPARK-45655: test current batch timestamp in streaming query metrics") { +import testImplicits._ + +val inputData = MemoryStream[Timestamp] + +val query = inputData.toDF() + .filter("value < current_date()") + .observe("metrics", count(expr("value >= current_date()")).alias("dropped")) + .writeStream + .queryName("ts_metrics_test") + .format("memory") + .outputMode("append") + .start() + +val timeNow = Instant.now() + +// this value would be accepted by the filter and would not count towards +// dropped metrics. +val validValue = Timestamp.from(timeNow.minus(2, ChronoUnit.DAYS)) +inputData.addData(validValue) + +// would be dropped by the filter and count towards dropped metrics +inputData.addData(Timestamp.from(timeNow.plus(2, ChronoUnit.DAYS))) + +query.processAllAvailable() + +val dropped = query.recentProgress.map(p => { + val metricVal = Option(p.observedMetrics.get("metrics")) + metricVal.map(_.getLong(0)).getOrElse(0L) +}).sum + +query.stop() Review Comment: nit: Let's make related code be near to each other. ``` query.processAllAvailable() query.stop() val dropped = ...blabla... assert(dropped == 1) val data = ...blabla... assert(data(0).getAs[Timestamp](0).equals(validValue)) ``` ## sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala: ## @@ -354,6 +357,47 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually { ) } + test("SPARK-45655: test current batch timestamp in streaming query metrics") { Review Comment: Let's make it more explicit - Use current batch timestamp in observe API. We can omit streaming query as the test suite is for streaming query. ## sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala: ## @@ -354,6 +357,47 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually { ) } + test("SPARK-45655: test current batch timestamp in streaming query metrics") { +import testImplicits._ + +val inputData = MemoryStream[Timestamp] + +val query = inputData.toDF() + .filter("value < current_date()") Review Comment: Let's add code comment that current_date() internally uses current batch timestamp on streaming query. Many people may not understand about the relationship between current_date() and current batch timestamp. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [WIP] [spark]
zeruibao opened a new pull request, #43518: URL: https://github.com/apache/spark/pull/43518 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45622][BUILD] java -target should use java.version instead of 17 [spark]
HyukjinKwon closed pull request #43476: [SPARK-45622][BUILD] java -target should use java.version instead of 17 URL: https://github.com/apache/spark/pull/43476 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45651][BUILD] Publish snapshot manually [spark]
HyukjinKwon closed pull request #43512: [SPARK-45651][BUILD] Publish snapshot manually URL: https://github.com/apache/spark/pull/43512 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45651][BUILD] Publish snapshot manually [spark]
HyukjinKwon commented on PR #43512: URL: https://github.com/apache/spark/pull/43512#issuecomment-1778247912 Merged to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45651][Build] Publish snapshot manually [spark]
HyukjinKwon commented on code in PR #43512: URL: https://github.com/apache/spark/pull/43512#discussion_r1370948876 ## .github/workflows/publish_snapshot.yml: ## @@ -30,11 +37,8 @@ jobs: strategy: fail-fast: false matrix: -branch: - - master - - branch-3.5 - - branch-3.4 - - branch-3.3 +# keep in sync with default value of workflow_dispatch input 'branch' +branch: ${{ fromJSON( inputs.branch || '["master", "branch-3.5", "branch-3.4", "branch-3.3"]' ) }} Review Comment: Hm, I don't think this helps. individual job within matrix has a isolated docker container to run the CI -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45640][SQL][TESTS] Fix flaky ProtobufCatalystDataConversionSuite [spark]
HeartSaVioR commented on PR #43493: URL: https://github.com/apache/spark/pull/43493#issuecomment-1778245180 This has to go to 3.5/3.4 as well. @panbingkun Would you mind file a PR for backport to 3.5 and 3.4? Separate PRs would be preferred. Please also include the change in this PR https://github.com/apache/spark/pull/43424 - only need to incorporate the change in connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala Thanks in advance! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45640][SQL][TESTS] Fix flaky ProtobufCatalystDataConversionSuite [spark]
rangadi commented on PR #43493: URL: https://github.com/apache/spark/pull/43493#issuecomment-1778240732 Thanks @HyukjinKwon -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45554][PYTHON] Introduce flexible parameter to `assertSchemaEqual` [spark]
itholic commented on PR #43450: URL: https://github.com/apache/spark/pull/43450#issuecomment-1778236396 This also CI passed. Gentle reminder for @HyukjinKwon, also cc @ueshin @zhengruifeng . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45552][PS] Introduce flexible parameters to `assertDataFrameEqual` [spark]
itholic commented on PR #43433: URL: https://github.com/apache/spark/pull/43433#issuecomment-1778234604 Gentle reminder for @HyukjinKwon as CI passed. Also cc @ueshin @zhengruifeng -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45640][SQL][TESTS] Fix flaky ProtobufCatalystDataConversionSuite [spark]
HyukjinKwon closed pull request #43493: [SPARK-45640][SQL][TESTS] Fix flaky ProtobufCatalystDataConversionSuite URL: https://github.com/apache/spark/pull/43493 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45640][SQL][TESTS] Fix flaky ProtobufCatalystDataConversionSuite [spark]
HyukjinKwon commented on PR #43493: URL: https://github.com/apache/spark/pull/43493#issuecomment-1778228481 Merged to master -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45651][Build] Log memory usage of publish snapshot workflow [spark]
HyukjinKwon commented on PR #43513: URL: https://github.com/apache/spark/pull/43513#issuecomment-1778227490 If this is just a debug purpose, I would recommend doing this within your fork first ... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45651][Build] Publish snapshot manually [spark]
HyukjinKwon commented on code in PR #43512: URL: https://github.com/apache/spark/pull/43512#discussion_r1370948876 ## .github/workflows/publish_snapshot.yml: ## @@ -30,11 +37,8 @@ jobs: strategy: fail-fast: false matrix: -branch: - - master - - branch-3.5 - - branch-3.4 - - branch-3.3 +# keep in sync with default value of workflow_dispatch input 'branch' +branch: ${{ fromJSON( inputs.branch || '["master", "branch-3.5", "branch-3.4", "branch-3.3"]' ) }} Review Comment: Hm, I don't think this helps. individual job within matrix has a isolated docker container to run the CI -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45640][SQL][TESTS] Fix flaky ProtobufCatalystDataConversionSuite [spark]
rangadi commented on PR #43493: URL: https://github.com/apache/spark/pull/43493#issuecomment-1778217161 @HeartSaVioR yes, backporting will be good. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45640][SQL][TESTS] Fix flaky ProtobufCatalystDataConversionSuite [spark]
HeartSaVioR commented on PR #43493: URL: https://github.com/apache/spark/pull/43493#issuecomment-1778201842 @rangadi I just indicated that the test suite with seed existed from 3.4. I guess you've also fixed some of the test issue in other PR. Should we port back that PR as well? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45640][SQL][TESTS] Fix flaky ProtobufCatalystDataConversionSuite [spark]
rangadi commented on PR #43493: URL: https://github.com/apache/spark/pull/43493#issuecomment-1778188720 @HeartSaVioR I have run this fix with see values from 0 to 1. All were successful. Could you merge this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45640][SQL][TESTS] Fix flaky ProtobufCatalystDataConversionSuite [spark]
rangadi commented on code in PR #43493: URL: https://github.com/apache/spark/pull/43493#discussion_r1370911418 ## connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala: ## @@ -138,6 +138,7 @@ class ProtobufCatalystDataConversionSuite data != null && (data.get(0) == defaultValue || (dt.fields(0).dataType == BinaryType && +data.get(0) != null && Review Comment: Ignore this suggestion. `data.get(0) != null` is correct. It allows testing null. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-45655] Mark CurrentBatchTimestamp as deterministic. [spark]
sahnib opened a new pull request, #43517: URL: https://github.com/apache/spark/pull/43517 ### What changes were proposed in this pull request? This PR removes the `NonDeterministic` trait from `CurrentBatchTimestamp` expression. `CurrentBatchTimestamp` is deterministic and its value is decided in the offset log during WAL and won’t change in case of retrial. ### Why are the changes needed? Streaming queries do not support current_date() inside CollectMetrics. The primary reason is that current_date() (resolves to CurrentBatchTimestamp) is marked as non-deterministic. However, current_date and current_timestamp are both deterministic today, and current_batch_timestamp should be the same. As an example, the query below fails (without this change) due to observe call on the DataFrame. ``` val inputData = MemoryStream[Timestamp] inputData.toDF()    .filter("value < current_date()")    .observe("metrics", count(expr("value >= current_date()")).alias("dropped"))    .writeStream    .queryName("ts_metrics_test")    .format("memory")    .outputMode("append")    .start() ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test cases added. ### Was this patch authored or co-authored using generative AI tooling? No -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45504][SS] Lower CPU Priority Of RocksDB Background Threads [spark]
siying commented on code in PR #43339: URL: https://github.com/apache/spark/pull/43339#discussion_r1370819296 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala: ## @@ -109,6 +109,8 @@ class RocksDB( dbOptions.setCreateIfMissing(true) dbOptions.setTableFormatConfig(tableFormatConfig) dbOptions.setMaxOpenFiles(conf.maxOpenFiles) + dbOptions.getEnv().lowerThreadPoolCPUPriority(Priority.HIGH) Review Comment: This cannot set up or down. Once priority for a thread is lowered, there is no way to move it up. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45504][SS] Lower CPU Priority Of RocksDB Background Threads [spark]
siying closed pull request #43339: [SPARK-45504][SS] Lower CPU Priority Of RocksDB Background Threads URL: https://github.com/apache/spark/pull/43339 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45504][SS] Lower CPU Priority Of RocksDB Background Threads [spark]
siying commented on PR #43339: URL: https://github.com/apache/spark/pull/43339#issuecomment-1778049354 The benchmark results not decisive. Sometimes it does show P99 improvements, but P50 tends to regress. Closing it for now. Will reopen if things change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-45654][PYTHON] Add Python data source write API [spark]
allisonwang-db opened a new pull request, #43516: URL: https://github.com/apache/spark/pull/43516 ### What changes were proposed in this pull request? This PR adds Python data source write API and `DataSourceWriter` class `datasource.py`. ### Why are the changes needed? To support Python data source write. ### Does this PR introduce _any_ user-facing change? No. This PR alone does not introduce any user-facing change. ### How was this patch tested? Unit test ### Was this patch authored or co-authored using generative AI tooling? No -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45545][CORE] Pass SSLOptions wherever we create a SparkTransportConf [spark]
hasnain-db commented on code in PR #43387: URL: https://github.com/apache/spark/pull/43387#discussion_r1370802419 ## core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala: ## @@ -38,11 +38,20 @@ import org.apache.spark.internal.config.Network import org.apache.spark.network.{BlockDataManager, BlockTransferService} import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer} import org.apache.spark.network.shuffle.BlockFetchingListener +import org.apache.spark.network.ssl.SslSampleConfigs import org.apache.spark.serializer.{JavaSerializer, SerializerManager} import org.apache.spark.storage.{BlockId, ShuffleBlockId} import org.apache.spark.util.ThreadUtils class NettyBlockTransferSecuritySuite extends SparkFunSuite with MockitoSugar with Matchers { + + private def sparkConfWithSsl(): SparkConf = { +val conf = new SparkConf() +val updatedConfigs = SslSampleConfigs.createDefaultConfigMap() +updatedConfigs.entrySet().forEach(entry => conf.set(entry.getKey, entry.getValue)) Review Comment: @mridulm I took a look but unfortunately I can't -- `TestUtils` is in the main codebase, not the test codebase. `SslSampleConfigs` is test only (See https://github.com/apache/spark/blob/48e207f4a2192d474f2c0f141b984ef0c36a78c3/core/src/main/scala/org/apache/spark/TestUtils.scala#L60-L66) But I created an `SslTestUtils` and used it everywhere except yarn. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-24497][SQL] Support recursive SQL [spark]
milimetric commented on PR #40744: URL: https://github.com/apache/spark/pull/40744#issuecomment-1778014631 @peter-toth thank you so much for sticking with this over three major versions and three separate pull requests. Recursive queries would be really nice to have in Spark SQL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45640][SQL][TESTS] Fix flaky ProtobufCatalystDataConversionSuite [spark]
HeartSaVioR commented on PR #43493: URL: https://github.com/apache/spark/pull/43493#issuecomment-1777987275 Please run the test suite with all combination for available seeds and available testing types and make sure all of them are passing. Let's fix all the broken-but-not-caught cases. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45503][SS] Add Conf to Set RocksDB Compression [spark]
HeartSaVioR closed pull request #43338: [SPARK-45503][SS] Add Conf to Set RocksDB Compression URL: https://github.com/apache/spark/pull/43338 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45503][SS] Add Conf to Set RocksDB Compression [spark]
HeartSaVioR commented on PR #43338: URL: https://github.com/apache/spark/pull/43338#issuecomment-1777981673 Thanks! Merging to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] Ranger spark integration v3.4 lyft [spark]
laguilarlyft opened a new pull request, #43515: URL: https://github.com/apache/spark/pull/43515 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] Ranger spark integration v3.4 lyft [spark]
laguilarlyft closed pull request #43515: Ranger spark integration v3.4 lyft URL: https://github.com/apache/spark/pull/43515 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45523][Python] Refactor the null-checking to have shortcuts [spark]
ueshin closed pull request #43492: [SPARK-45523][Python] Refactor the null-checking to have shortcuts URL: https://github.com/apache/spark/pull/43492 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45523][Python] Refactor the null-checking to have shortcuts [spark]
ueshin commented on PR #43492: URL: https://github.com/apache/spark/pull/43492#issuecomment-1777963132 The failed tests seem not related to this PR. Let me merge this now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45523][Python] Refactor the null-checking to have shortcuts [spark]
ueshin commented on PR #43492: URL: https://github.com/apache/spark/pull/43492#issuecomment-1777963392 Thanks. merging to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45640][SQL][TESTS] Fix flaky ProtobufCatalystDataConversionSuite [spark]
rangadi commented on code in PR #43493: URL: https://github.com/apache/spark/pull/43493#discussion_r1370711411 ## connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala: ## @@ -138,6 +138,7 @@ class ProtobufCatalystDataConversionSuite data != null && (data.get(0) == defaultValue || (dt.fields(0).dataType == BinaryType && +data.get(0) != null && Review Comment: We should allow null value for the field. I.e. change this to: ```scala (data.get(0) == null || data.get(0).asInstanceOf[Array[Byte]].isEmpty) ``` I will test on my side to ensure this test stays stable with range of `seed` values. cc: @HeartSaVioR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45640][SQL][TESTS] Fix flaky ProtobufCatalystDataConversionSuite [spark]
rangadi commented on code in PR #43493: URL: https://github.com/apache/spark/pull/43493#discussion_r1370711411 ## connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala: ## @@ -138,6 +138,7 @@ class ProtobufCatalystDataConversionSuite data != null && (data.get(0) == defaultValue || (dt.fields(0).dataType == BinaryType && +data.get(0) != null && Review Comment: We should allow null value for the field. I.e. change this to: ```scala data.get(0) == null || data.get(0).asInstanceOf[Array[Byte]].isEmpty ``` I will test on my side to ensure this test stays stable with range of `seed` values. cc: @HeartSaVioR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45653] Refractor XmlSuite to allow other test suites to easily extend and override [spark]
shujingyang-db closed pull request #43300: [SPARK-45653] Refractor XmlSuite to allow other test suites to easily extend and override URL: https://github.com/apache/spark/pull/43300 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45503][SS] Add Conf to Set RocksDB Compression [spark]
siying commented on PR #43338: URL: https://github.com/apache/spark/pull/43338#issuecomment-1777847399 Tests all passed now. CC @HeartSaVioR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] Ranger spark integration [spark]
laguilarlyft opened a new pull request, #43514: URL: https://github.com/apache/spark/pull/43514 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] Ranger spark integration [spark]
laguilarlyft closed pull request #43514: Ranger spark integration URL: https://github.com/apache/spark/pull/43514 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45527][core] Use fraction to do the resource calculation [spark]
tgravescs commented on PR #43494: URL: https://github.com/apache/spark/pull/43494#issuecomment-1777803901 I'll take a look, might not get to it today. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]
ueshin closed pull request #43360: [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API URL: https://github.com/apache/spark/pull/43360 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45524][PYTHON][SQL] Initial support for Python data source read API [spark]
ueshin commented on PR #43360: URL: https://github.com/apache/spark/pull/43360#issuecomment-193417 Thanks! merging to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45643][CORE][SQL] Replace `s.c.mutable.MapOps#transform` with `s.c.mutable.MapOps#mapValuesInPlace` [spark]
LuciferYang commented on PR #43500: URL: https://github.com/apache/spark/pull/43500#issuecomment-1777633925 Thanks @dongjoon-hyun and @beliefer -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45643][CORE][SQL] Replace `s.c.mutable.MapOps#transform` with `s.c.mutable.MapOps#mapValuesInPlace` [spark]
dongjoon-hyun commented on PR #43500: URL: https://github.com/apache/spark/pull/43500#issuecomment-1777631433 Merged to master~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45643][CORE][SQL] Replace `s.c.mutable.MapOps#transform` with `s.c.mutable.MapOps#mapValuesInPlace` [spark]
dongjoon-hyun closed pull request #43500: [SPARK-45643][CORE][SQL] Replace `s.c.mutable.MapOps#transform` with `s.c.mutable.MapOps#mapValuesInPlace` URL: https://github.com/apache/spark/pull/43500 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45533][CORE]Use j.l.r.Cleaner instead of finalize for RocksDBIterator [spark]
LuciferYang commented on PR #43502: URL: https://github.com/apache/spark/pull/43502#issuecomment-1777630300 @zhaomin1423 Firstly, we may need to add two helper methods: 1. Add a method in `RocksDB` that can get its corresponding `WeakReference` through `RocksDBIterator`, here I assume it's called `iteratorReference` 2. Add a method in `RocksDBIterator` to get `RocksIterator`, here I assume it's called `internalIterator` Then, the test case may like the following: ```java @Test public void testResourceCleaner() throws Exception { File dbPathForCleanerTest = File.createTempFile( "test_db_cleaner.", ".rdb"); dbPathForCleanerTest.delete(); RocksDB dbForCleanerTest = new RocksDB(dbPathForCleanerTest); for (int i = 0; i < 8192; i++) { dbForCleanerTest.write(createCustomType1(i)); } RocksDBIterator rocksDBIterator = (RocksDBIterator) dbForCleanerTest.view(CustomType1.class).iterator(); Optional>> referenceOpt = dbForCleanerTest.iteratorReference(rocksDBIterator); assertTrue(referenceOpt.isPresent()); RocksIterator it = rocksDBIterator.internalIterator(); assertTrue(it.isOwningHandle()); // it has not been closed yet, isOwningHandle should be true. rocksDBIterator = null; // Manually set rocksDBIterator to null, to be GC. Reference> ref = referenceOpt.get(); // 100 times gc, the rocksDBIterator should be GCed. int count = 0; while (count < 100 && !ref.refersTo(null)) { System.gc(); count++; Thread.sleep(100); } assertTrue(ref.refersTo(null)); // check rocksDBIterator should be GCed // Verify that the Cleaner will be executed after a period of time, after which it.isOwningHandle() will become false. assertTimeout(java.time.Duration.ofSeconds(5), () -> assertFalse(it.isOwningHandle())); dbForCleanerTest.close(); } ``` Of course, we can also add a state in `ResourceCleaner` to indicate whether it has been executed, initially set to false, and turns true after execution. These are just some of my thoughts, there might be better ways to test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45651][Build] Log memory usage of publish snapshot workflow [spark]
EnricoMi commented on PR #43513: URL: https://github.com/apache/spark/pull/43513#issuecomment-1777613933 CC @LuciferYang @HyukjinKwon -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45452][SQL][FOLLOWUP] Do not fail too early if FileSystem does not implement getSchema [spark]
dongjoon-hyun closed pull request #43508: [SPARK-45452][SQL][FOLLOWUP] Do not fail too early if FileSystem does not implement getSchema URL: https://github.com/apache/spark/pull/43508 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-45651][Build] Log memory usage of publish snapshot workflow [spark]
EnricoMi opened a new pull request, #43513: URL: https://github.com/apache/spark/pull/43513 ### What changes were proposed in this pull request? This logs memory consumption while publishing snapshots. This is to investigate whether the suspected high memory usage is the root cause of `publish_snapshots` failures for master. Merging this after #43512 allows to run this manually. ### Why are the changes needed? The working assumption is that high memory usage is the root cause. This logging should provide proof or disproof for this assumption. This can be reverted once more is known or SPARK-45651 is fixed. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Locally ### Was this patch authored or co-authored using generative AI tooling? No -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45536][BUILD] Lower the default `-Xmx` of `build/mvn` to 3g [spark]
EnricoMi commented on PR #43364: URL: https://github.com/apache/spark/pull/43364#issuecomment-1777584206 Another attempt to fix this in #43512 / SPARK-45651. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45651][Build] Publish snapshot manually [spark]
EnricoMi commented on PR #43512: URL: https://github.com/apache/spark/pull/43512#issuecomment-1777582248 I have outlined a sequence of steps to fix the snapshot publish workflow in SPARK-45651. This is a pre-cursor, which can remain in master as it is generally useful if publishing snapshots fail and a workflow fix is required, which is where merely rerunning does not work (as the rerun will not pick up the workflow fix). @LuciferYang @HyukjinKwon -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-45651][Build] Publish snapshot manually [spark]
EnricoMi opened a new pull request, #43512: URL: https://github.com/apache/spark/pull/43512 ### What changes were proposed in this pull request? With a manual trigger, the workflow can be executed manually after merging a fix of the workflow to master. This also allows to run the workflow only on a subset of branches (e.g. those that failed). ### Why are the changes needed? Sometime, publishing snapshots fails. If a fix of the workflow file is needed, that change can only be tested by waiting for the next day when the cron even triggers the next publish. This is quite a long turnaround to test fixes to that workflow (see #43364). ### Does this PR introduce _any_ user-facing change? No, this is purely build CI related. ### How was this patch tested? Github workflow syntax tested in a private repo. ### Was this patch authored or co-authored using generative AI tooling? No -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-44405][SQL]Reduce code duplication in group-based DELETE and MERGE tests [spark]
zhaomin1423 opened a new pull request, #43511: URL: https://github.com/apache/spark/pull/43511 ### What changes were proposed in this pull request? Reduce code duplication in group-based DELETE and MERGE tests ### Why are the changes needed? There are a few reduncant mothods, merge them to reduce code duplication ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? pass actions ### Was this patch authored or co-authored using generative AI tooling? No -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45533][CORE]Use j.l.r.Cleaner instead of finalize for RocksDBIterator [spark]
zhaomin1423 commented on PR #43502: URL: https://github.com/apache/spark/pull/43502#issuecomment-1777530901 > How do we finish the tests? I haven’t thought of a good testing method yet, do you have any suggestions? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45533][CORE]Use j.l.r.Cleaner instead of finalize for RocksDBIterator [spark]
zhaomin1423 commented on code in PR #43502: URL: https://github.com/apache/spark/pull/43502#discussion_r1370432695 ## common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDBIterator.java: ## @@ -36,20 +41,24 @@ class RocksDBIterator implements KVStoreIterator { private final byte[] indexKeyPrefix; private final byte[] end; private final long max; + private final Cleaner.Cleanable cleanable; private boolean checkedNext; private byte[] next; private boolean closed; private long count; - RocksDBIterator(Class type, RocksDB db, KVStoreView params) throws Exception { + RocksDBIterator(Class type, RocksDB db, KVStoreView params, + ConcurrentLinkedQueue>> iteratorTracker, Review Comment: thanks, done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45533][CORE]Use j.l.r.Cleaner instead of finalize for RocksDBIterator [spark]
zhaomin1423 commented on code in PR #43502: URL: https://github.com/apache/spark/pull/43502#discussion_r1370432372 ## common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDBIterator.java: ## @@ -272,4 +270,32 @@ static int compare(byte[] a, byte[] b) { return a.length - b.length; } + static class ResourceCleaner implements Runnable { + +private final RocksIterator rocksIterator; + +private final AtomicReference _db; + +private final ConcurrentLinkedQueue>> iteratorTracker; + +public ResourceCleaner(RocksIterator rocksIterator, +AtomicReference _db, +ConcurrentLinkedQueue>> iteratorTracker) { + this.rocksIterator = rocksIterator; + this._db = _db; + this.iteratorTracker = iteratorTracker; +} + +@Override +public void run() { + synchronized (this._db) { +org.rocksdb.RocksDB _db = this._db.get(); +if (_db == null) { + return; +} +iteratorTracker.removeIf(ref -> ref.get() != null && rocksIterator.equals(ref.get().it)); Review Comment: thanks, done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45533][CORE]Use j.l.r.Cleaner instead of finalize for RocksDBIterator [spark]
zhaomin1423 commented on PR #43502: URL: https://github.com/apache/spark/pull/43502#issuecomment-1777494074 > Although the current pr only fixes part of RocksDB now, I suggest adding the LevelDB part in this PR once the RocksDB part is deemed feasible. I prefer to complete them in one. ok, will add LevelDB part later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45596][CONNECT] Use java.lang.ref.Cleaner instead of org.apache.spark.sql.connect.client.util.Cleaner [spark]
zhaomin1423 commented on PR #43439: URL: https://github.com/apache/spark/pull/43439#issuecomment-1777490785 > > What I am confused about is, how should we check the effect after gc is triggered > > Construct a case where `SparkResult` won't be manually closed, and check if after registering `cleanable`, `Cleaner.impl.phantomCleanableList` is not empty, but eventually `phantomCleanableList` is empty? This may require adding some code to get `phantomCleanableList` through reflection, just a suggestion, not sure if it's really feasible. can't get Cleaner.impl by reflect, there is an exception. ``` Exception in thread "main" java.lang.reflect.InaccessibleObjectException: Unable to make field final jdk.internal.ref.CleanerImpl java.lang.ref.Cleaner.impl accessible: module java.base does not "opens java.lang.ref" to unnamed module @2f92e0f4 at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:354) at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:297) at java.base/java.lang.reflect.Field.checkCanSetAccessible(Field.java:178) at java.base/java.lang.reflect.Field.setAccessible(Field.java:172) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45646][SQL] Remove hardcoding time variables prior to Hive 2.0 [spark]
pan3793 commented on PR #43506: URL: https://github.com/apache/spark/pull/43506#issuecomment-1777422561 thanks @LuciferYang and all -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45646][SQL] Remove hardcoding time variables prior to Hive 2.0 [spark]
LuciferYang commented on PR #43506: URL: https://github.com/apache/spark/pull/43506#issuecomment-1777413584 Merged into master for Spark 4.0. Thanks @pan3793 @HyukjinKwon @yaooqinn -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45646][SQL] Remove hardcoding time variables prior to Hive 2.0 [spark]
LuciferYang closed pull request #43506: [SPARK-45646][SQL] Remove hardcoding time variables prior to Hive 2.0 URL: https://github.com/apache/spark/pull/43506 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [Don't merge or review][WIP] Test k8s changes [spark]
junyuc25 opened a new pull request, #43510: URL: https://github.com/apache/spark/pull/43510 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [Don't merge or review] Test k8s changes [spark]
junyuc25 commented on PR #43509: URL: https://github.com/apache/spark/pull/43509#issuecomment-1777327745 \ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [Don't merge or review] Test k8s changes [spark]
junyuc25 closed pull request #43509: [Don't merge or review] Test k8s changes URL: https://github.com/apache/spark/pull/43509 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [Don't merge or review] Test k8s changes [spark]
junyuc25 closed pull request #43509: [Don't merge or review] Test k8s changes URL: https://github.com/apache/spark/pull/43509 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org