This is an automated email from the ASF dual-hosted git repository. dzamo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push: new 5ce0c88ffd DRILL-8426: Fix endless retrying zk set data for a large query (#2796) 5ce0c88ffd is described below commit 5ce0c88ffd6e50245da5ef30cf47a321acaa12fb Author: Maksym Rymar <rim.maxim+...@gmail.com> AuthorDate: Tue May 2 16:01:31 2023 +0300 DRILL-8426: Fix endless retrying zk set data for a large query (#2796) --- .../drill/exec/coord/zk/ZookeeperClient.java | 29 +++++++++++++++++++--- .../apache/drill/exec/work/foreman/Foreman.java | 3 +-- .../drill/exec/work/foreman/QueryManager.java | 10 ++++++-- .../java-exec/src/main/resources/drill-module.conf | 2 +- 4 files changed, 35 insertions(+), 9 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java index fee607d2ab..ec98453e05 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java @@ -33,10 +33,13 @@ import org.apache.drill.common.collections.ImmutableEntry; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.exec.exception.VersionMismatchException; import org.apache.drill.exec.store.sys.store.DataChangeVersion; +import org.apache.jute.BinaryInputArchive; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.NodeExistsException; import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A namespace aware Zookeeper client. @@ -46,10 +49,12 @@ import org.apache.zookeeper.data.Stat; * Note that instance of this class holds onto resources that must be released via {@code #close()}. */ public class ZookeeperClient implements AutoCloseable { + private static final Logger logger = LoggerFactory.getLogger(ZookeeperClient.class); private final CuratorFramework curator; private final String root; private final PathChildrenCache cache; private final CreateMode mode; + private final int MAX_DATA_LENGTH = BinaryInputArchive.maxBuffer; public ZookeeperClient(final CuratorFramework curator, final String root, final CreateMode mode) { this.curator = Preconditions.checkNotNull(curator, "curator is required"); @@ -241,6 +246,7 @@ public class ZookeeperClient implements AutoCloseable { * * @param path target path * @param data data to store + * @throws java.lang.IllegalArgumentException if data size is bigger that jute.maxbuffer value */ public void put(final String path, final byte[] data) { put(path, data, null); @@ -248,9 +254,9 @@ public class ZookeeperClient implements AutoCloseable { /** * Puts the given byte sequence into the given path. - * + * <p> * If path does not exists, this call creates it. - * + * <p> * If version holder is not null and path already exists, passes given version for comparison. * Zookeeper maintains stat structure that holds version number which increases each time znode data change is performed. * If we pass version that doesn't match the actual version of the data, @@ -258,13 +264,19 @@ public class ZookeeperClient implements AutoCloseable { * We catch such exception and re-throw it as {@link VersionMismatchException}. * Link to documentation - https://zookeeper.apache.org/doc/r3.2.2/zookeeperProgrammers.html#sc_zkDataModel_znodes * - * @param path target path - * @param data data to store + * @param path target path + * @param data data to store * @param version version holder + * @throws java.lang.IllegalArgumentException if data size is bigger that jute.maxbuffer value */ public void put(final String path, final byte[] data, DataChangeVersion version) { Preconditions.checkNotNull(path, "path is required"); Preconditions.checkNotNull(data, "data is required"); + if (data.length > MAX_DATA_LENGTH) { + throw new IllegalArgumentException( + String.format("Can't put this data. Data size %d bytes is bigger than jute.maxbuffer %d", data.length, MAX_DATA_LENGTH) + ); + } final String target = PathUtils.join(root, path); try { @@ -297,6 +309,8 @@ public class ZookeeperClient implements AutoCloseable { } catch (final VersionMismatchException e) { throw e; } catch (final Exception e) { + logger.info("Data size to persist is: {} bytes, client jute.maxbuffer value is: {}. Make sure that the client " + + "jute.maxbuffer value corresponds to the zookeeper server value.", data.length, MAX_DATA_LENGTH); throw new DrillRuntimeException("unable to put ", e); } } @@ -311,6 +325,11 @@ public class ZookeeperClient implements AutoCloseable { public byte[] putIfAbsent(final String path, final byte[] data) { Preconditions.checkNotNull(path, "path is required"); Preconditions.checkNotNull(data, "data is required"); + if (data.length > MAX_DATA_LENGTH) { + throw new IllegalArgumentException( + String.format("Can't put this data. Data size %d bytes is bigger than jute.maxbuffer %d", data.length, MAX_DATA_LENGTH) + ); + } final String target = PathUtils.join(root, path); try { @@ -323,6 +342,8 @@ public class ZookeeperClient implements AutoCloseable { } return curator.getData().forPath(target); } catch (final Exception e) { + logger.info("Data size to persist is: {} bytes, client jute.maxbuffer value is: {}. Make sure that the client " + + "jute.maxbuffer value corresponds to the zookeeper server value.", data.length, MAX_DATA_LENGTH); throw new DrillRuntimeException("unable to put ", e); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java index 1ddc1150cc..202163533d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java @@ -252,9 +252,8 @@ public class Foreman implements Runnable { } queryText = queryRequest.getPlan(); - queryStateProcessor.moveToState(QueryState.PLANNING, null); - try { + queryStateProcessor.moveToState(QueryState.PLANNING, null); injector.injectChecked(queryContext.getExecutionControls(), "run-try-beginning", ForemanException.class); // convert a run query request into action diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java index e99ba5a402..10679b061c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java @@ -297,8 +297,14 @@ public class QueryManager implements AutoCloseable { case STARTING: case RUNNING: case CANCELLATION_REQUESTED: - runningProfileStore.put(stringQueryId, getQueryInfo()); // store as ephemeral query profile. - inTransientStore = true; + try { + runningProfileStore.put(stringQueryId, getQueryInfo()); // store as ephemeral query profile. + inTransientStore = true; + } catch (IllegalArgumentException e) { + throw UserException.executionError(e) + .message("Failed to persist query info. Query length is too big.", e) + .build(logger); + } break; case COMPLETED: case CANCELED: diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf index 488d8af088..ff817b5e4b 100644 --- a/exec/java-exec/src/main/resources/drill-module.conf +++ b/exec/java-exec/src/main/resources/drill-module.conf @@ -142,7 +142,7 @@ drill.exec: { refresh: 500, timeout: 5000, retry: { - count: 7200, + count: 15, delay: 500 }, apply_secure_acl: false,