[jira] [Commented] (DRILL-8253) Support the limit results in kafka scan
Title: Message Title ASF GitHub Bot commented on DRILL-8253 Re: Support the limit results in kafka scan luocooong commented on code in PR #2580: URL: https://github.com/apache/drill/pull/2580#discussion_r908637682 ## contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java: ## @@ -271,8 +292,11 @@ public void applyAssignments(List incomingEndpoints) { @Override public GroupScan applyLimit(int maxRecords) { records = maxRecords; // Just apply the limit value into sub-scan return super.applyLimit(maxRecords); + if (maxRecords > records) { // pass the limit value into sub-scan + return new KafkaGroupScan(this, maxRecords); + } else { // stop the transform + return super.applyLimit(maxRecords); Review Comment: Can be returned null directly. Add Comment This message was sent by Atlassian Jira (v8.20.10#820010-sha1:ace47f9)
[jira] [Commented] (DRILL-8253) Support the limit results in kafka scan
Title: Message Title ASF GitHub Bot commented on DRILL-8253 Re: Support the limit results in kafka scan cgivre commented on code in PR #2580: URL: https://github.com/apache/drill/pull/2580#discussion_r908624673 ## contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java: ## @@ -271,8 +292,11 @@ public void applyAssignments(List incomingEndpoints) { @Override public GroupScan applyLimit(int maxRecords) { records = maxRecords; // Just apply the limit value into sub-scan return super.applyLimit(maxRecords); + if (maxRecords > records) { // pass the limit value into sub-scan + return new KafkaGroupScan(this, maxRecords); + } else { // stop the transform + return super.applyLimit(maxRecords); Review Comment: Do we still need to call the `super` method here? Could we just return `null`? Add Comment This message was sent by Atlassian Jira (v8.20.10#820010-sha1:ace47f9)
[jira] [Commented] (DRILL-8253) Support the limit results in kafka scan
Title: Message Title ASF GitHub Bot commented on DRILL-8253 Re: Support the limit results in kafka scan luocooong commented on code in PR #2580: URL: https://github.com/apache/drill/pull/2580#discussion_r908624939 ## contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java: ## @@ -262,6 +269,17 @@ public void applyAssignments(List incomingEndpoints) { assignments = AssignmentCreator.getMappings(incomingEndpoints, Lists.newArrayList(partitionWorkMap.values())); } + @Override + public GroupScan applyLimit(int maxRecords) { + records = maxRecords; // Just apply the limit value into sub-scan + return super.applyLimit(maxRecords); + } + Review Comment: Follow your suggestion, create the immutable groupscan always. Add Comment This message was sent by Atlassian Jira (v8.20.10#820010-sha1:ace47f9)
[jira] [Commented] (DRILL-8254) upgrade mysql-connectors-java to 8.0.28 due to CVE-2022-21363
Title: Message Title ASF GitHub Bot commented on DRILL-8254 Re: upgrade mysql-connectors-java to 8.0.28 due to CVE-2022-21363 kingswanwho opened a new pull request, #2581: URL: https://github.com/apache/drill/pull/2581 DRILL-8254(https://issues.apache.org/jira/browse/DRILL-8254): upgrade mysql-connectors-java to 8.0.28 due to CVE-2022-21363 Description Upgrade mysql-connectors-java to 8.0.28 due to CVE-2022-21363 Documentation Please refer to https://github.com/advisories/GHSA-g76j-4cxx-23h9 Testing UT test Add Comment This message was sent by Atlassian Jira (v8.20.10#820010-sha1:ace47f9)
[jira] [Updated] (DRILL-8254) upgrade mysql-connectors-java to 8.0.28 due to CVE-2022-21363
Title: Message Title Jingchuan Hu updated an issue Apache Drill / DRILL-8254 upgrade mysql-connectors-java to 8.0.28 due to CVE-2022-21363 Change By: Jingchuan Hu Summary: Upgrade MySQL Connectors JAVA Due upgrade mysql-connectors-java to 8.0.28 due to CVE-2022-21363 Add Comment This message was sent by Atlassian Jira (v8.20.10#820010-sha1:ace47f9)
[jira] [Commented] (DRILL-8253) Support the limit results in kafka scan
Title: Message Title ASF GitHub Bot commented on DRILL-8253 Re: Support the limit results in kafka scan cgivre commented on code in PR #2580: URL: https://github.com/apache/drill/pull/2580#discussion_r908569894 ## contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java: ## @@ -262,6 +269,17 @@ public void applyAssignments(List incomingEndpoints) { assignments = AssignmentCreator.getMappings(incomingEndpoints, Lists.newArrayList(partitionWorkMap.values())); } + @Override + public GroupScan applyLimit(int maxRecords) { + records = maxRecords; // Just apply the limit value into sub-scan + return super.applyLimit(maxRecords); + } + Review Comment: I wouldn't recommend that. My understanding of how all this works is that Calcite/Drill create multiple copies of the GroupScan to try to find the optimal query plan. The GroupScans have to be immutable otherwise strange things will happen when you execute more complex queries, and in all likelihood, the limit won't be pushed down. Add Comment This message was sent by Atlassian Jira (v8.20.10#820010-sha1:ace47f9)
[jira] [Created] (DRILL-8254) Upgrade MySQL Connectors JAVA Due to CVE-2022-21363
Title: Message Title Jingchuan Hu created an issue Apache Drill / DRILL-8254 Upgrade MySQL Connectors JAVA Due to CVE-2022-21363 Issue Type: Bug Assignee: Unassigned Created: 28/Jun/22 14:35 Priority: Major Reporter: Jingchuan Hu Upgrade MySQL Connectors JAVA to 8.0.28 due to CVE-2022-21363 https://github.com/advisories/GHSA-g76j-4cxx-23h9 Add Comment This message was sent by Atlassian Jira (v8.20.10#820010-sha1:ace47f9)
[jira] [Commented] (DRILL-8253) Support the limit results in kafka scan
Title: Message Title ASF GitHub Bot commented on DRILL-8253 Re: Support the limit results in kafka scan luocooong commented on code in PR #2580: URL: https://github.com/apache/drill/pull/2580#discussion_r908439009 ## contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java: ## @@ -359,7 +382,7 @@ public KafkaStoragePlugin getStoragePlugin() { @Override public String toString() { return String.format("KafkaGroupScan [KafkaScanSpec=%s, columns=%s]", kafkaScanSpec, columns); + return String.format("KafkaGroupScan [KafkaScanSpec=%s, columns=%s, records=%d]", kafkaScanSpec, columns, records); Review Comment: Good idea, thanks! Add Comment This message was sent by Atlassian Jira (v8.20.10#820010-sha1:ace47f9)
[jira] [Commented] (DRILL-8253) Support the limit results in kafka scan
Title: Message Title ASF GitHub Bot commented on DRILL-8253 Re: Support the limit results in kafka scan luocooong commented on code in PR #2580: URL: https://github.com/apache/drill/pull/2580#discussion_r908438483 ## contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java: ## @@ -77,6 +77,7 @@ public class KafkaGroupScan extends AbstractGroupScan { private final KafkaScanSpec kafkaScanSpec; private List columns; + private int records; Review Comment: Done. Add Comment This message was sent by Atlassian Jira (v8.20.10#820010-sha1:ace47f9)
[jira] [Commented] (DRILL-8253) Support the limit results in kafka scan
Title: Message Title ASF GitHub Bot commented on DRILL-8253 Re: Support the limit results in kafka scan luocooong commented on code in PR #2580: URL: https://github.com/apache/drill/pull/2580#discussion_r908438074 ## contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java: ## @@ -262,6 +269,17 @@ public void applyAssignments(List incomingEndpoints) { assignments = AssignmentCreator.getMappings(incomingEndpoints, Lists.newArrayList(partitionWorkMap.values())); } + @Override + public GroupScan applyLimit(int maxRecords) { + records = maxRecords; // Just apply the limit value into sub-scan + return super.applyLimit(maxRecords); + } + Review Comment: Yes, I'm just call this parent interface that returns null (As long as the default behavior is not broken), because I just need to get the limit value from the execution plan. Add Comment This message was sent by Atlassian Jira (v8.20.10#820010-sha1:ace47f9)
[jira] [Commented] (DRILL-8253) Support the limit results in kafka scan
Title: Message Title ASF GitHub Bot commented on DRILL-8253 Re: Support the limit results in kafka scan cgivre commented on code in PR #2580: URL: https://github.com/apache/drill/pull/2580#discussion_r908416571 ## contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java: ## @@ -262,6 +269,17 @@ public void applyAssignments(List incomingEndpoints) { assignments = AssignmentCreator.getMappings(incomingEndpoints, Lists.newArrayList(partitionWorkMap.values())); } + @Override + public GroupScan applyLimit(int maxRecords) { + records = maxRecords; // Just apply the limit value into sub-scan + return super.applyLimit(maxRecords); + } + Review Comment: I'm fairly certain this isn't actually working. The super method always returns `null`. https://github.com/apache/drill/blob/53e6f2697b31cb76a38e75fe95283512715d28c3/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java#L174-L178 What I think you need to do here is: 1. Create a clone constructor in the `KafkaGroupScan` 2. Call that and return a new `KafkaGroupScan` with the limit applied. ## contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java: ## @@ -77,6 +77,7 @@ public class KafkaGroupScan extends AbstractGroupScan { private final KafkaScanSpec kafkaScanSpec; private List columns; + private int records; Review Comment: These variables should all be `final`. ## contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java: ## @@ -359,7 +382,7 @@ public KafkaStoragePlugin getStoragePlugin() { @Override public String toString() { return String.format("KafkaGroupScan [KafkaScanSpec=%s, columns=%s]", kafkaScanSpec, columns); + return String.format("KafkaGroupScan [KafkaScanSpec=%s, columns=%s, records=%d]", kafkaScanSpec, columns, records); Review Comment: Could we use the `PlanStringBuilder` here? Add Comment This message was sent by Atlassian Jira (v
[jira] [Commented] (DRILL-8253) Support the limit results in kafka scan
Title: Message Title ASF GitHub Bot commented on DRILL-8253 Re: Support the limit results in kafka scan luocooong opened a new pull request, #2580: URL: https://github.com/apache/drill/pull/2580 DRILL-8253(https://issues.apache.org/jira/browse/DRILL-8253): Support the limit results in kafka scan Description In the current implementation of the kafka storage, although we use the limit detection method in the result loader, but the actual `maxRecords` is always -1. The mechanism for this revision is to get limit (SQL parsing from the execution plan stage) from the group scan, then pass it to the sub-scan, and finally process it in the result loader. Documentation N/A Testing Added the unit test. KafkaQueriesTest#testResultLimit() Add Comment This message was sent by Atlassian Jira (v8.20.10#820010-sha1:ace47f9)
[jira] [Created] (DRILL-8253) Support the limit results in kafka scan
Title: Message Title Cong Luo created an issue Apache Drill / DRILL-8253 Support the limit results in kafka scan Issue Type: Improvement Assignee: Cong Luo Components: Storage - Kafka Created: 28/Jun/22 10:42 Fix Versions: 1.20.2 Priority: Major Reporter: Cong Luo In the current implementation of the kafka storage, although we use the limit detection method in the result loader, but the actual maxRecords is always -1. Add Comment