[jira] [Commented] (DRILL-8253) Support the limit results in kafka scan

2022-06-28 Thread ASF GitHub Bot (Jira)
Title: Message Title


 
 
 
 

 
 
 

 
   
 ASF GitHub Bot commented on  DRILL-8253  
 

  
 
 
 
 

 
 
  
 
 
 
 

 
  Re: Support the limit results in kafka scan   
 

  
 
 
 
 

 
 luocooong commented on code in PR #2580: URL: https://github.com/apache/drill/pull/2580#discussion_r908637682 ## contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java: ## @@ -271,8 +292,11 @@ public void applyAssignments(List incomingEndpoints) {  @Override public GroupScan applyLimit(int maxRecords) { 
 
records = maxRecords; // Just apply the limit value into sub-scan 
return super.applyLimit(maxRecords); + if (maxRecords > records) { // pass the limit value into sub-scan + return new KafkaGroupScan(this, maxRecords); + }  else { // stop the transform + return super.applyLimit(maxRecords); 
 Review Comment: Can be returned null directly.  
 

  
 
 
 
 

 
 
 

 
 
 Add Comment  
 

  
 

  
 
 
 
  
 

  
 
 
 
 

 
 This message was sent by Atlassian Jira (v8.20.10#820010-sha1:ace47f9)  
 
 

 
   
 

  
 

  
 

   



[jira] [Commented] (DRILL-8253) Support the limit results in kafka scan

2022-06-28 Thread ASF GitHub Bot (Jira)
Title: Message Title


 
 
 
 

 
 
 

 
   
 ASF GitHub Bot commented on  DRILL-8253  
 

  
 
 
 
 

 
 
  
 
 
 
 

 
  Re: Support the limit results in kafka scan   
 

  
 
 
 
 

 
 cgivre commented on code in PR #2580: URL: https://github.com/apache/drill/pull/2580#discussion_r908624673 ## contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java: ## @@ -271,8 +292,11 @@ public void applyAssignments(List incomingEndpoints) {  @Override public GroupScan applyLimit(int maxRecords) { 
 
records = maxRecords; // Just apply the limit value into sub-scan 
return super.applyLimit(maxRecords); + if (maxRecords > records) { // pass the limit value into sub-scan + return new KafkaGroupScan(this, maxRecords); + }  else { // stop the transform + return super.applyLimit(maxRecords); 
 Review Comment: Do we still need to call the `super` method here? Could we just return `null`?  
 

  
 
 
 
 

 
 
 

 
 
 Add Comment  
 

  
 

  
 
 
 
  
 

  
 
 
 
 

 
 This message was sent by Atlassian Jira (v8.20.10#820010-sha1:ace47f9)  
 
 

 
   
 

  
 

  
 

   



[jira] [Commented] (DRILL-8253) Support the limit results in kafka scan

2022-06-28 Thread ASF GitHub Bot (Jira)
Title: Message Title


 
 
 
 

 
 
 

 
   
 ASF GitHub Bot commented on  DRILL-8253  
 

  
 
 
 
 

 
 
  
 
 
 
 

 
  Re: Support the limit results in kafka scan   
 

  
 
 
 
 

 
 luocooong commented on code in PR #2580: URL: https://github.com/apache/drill/pull/2580#discussion_r908624939 ## contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java: ## @@ -262,6 +269,17 @@ public void applyAssignments(List incomingEndpoints)  { assignments = AssignmentCreator.getMappings(incomingEndpoints, Lists.newArrayList(partitionWorkMap.values())); } + @Override + public GroupScan applyLimit(int maxRecords)  { + records = maxRecords; // Just apply the limit value into sub-scan + return super.applyLimit(maxRecords); + } + Review Comment: Follow your suggestion, create the immutable groupscan always.  
 

  
 
 
 
 

 
 
 

 
 
 Add Comment  
 

  
 

  
 
 
 
  
 

  
 
 
 
 

 
 This message was sent by Atlassian Jira (v8.20.10#820010-sha1:ace47f9)  
 
 

 
   
 

  
 

  
 

   



[jira] [Commented] (DRILL-8254) upgrade mysql-connectors-java to 8.0.28 due to CVE-2022-21363

2022-06-28 Thread ASF GitHub Bot (Jira)
Title: Message Title


 
 
 
 

 
 
 

 
   
 ASF GitHub Bot commented on  DRILL-8254  
 

  
 
 
 
 

 
 
  
 
 
 
 

 
  Re: upgrade mysql-connectors-java to 8.0.28 due to CVE-2022-21363   
 

  
 
 
 
 

 
 kingswanwho opened a new pull request, #2581: URL: https://github.com/apache/drill/pull/2581 
 
DRILL-8254(https://issues.apache.org/jira/browse/DRILL-8254): upgrade mysql-connectors-java to 8.0.28 due to CVE-2022-21363 
 
 
 
 
Description Upgrade mysql-connectors-java to 8.0.28 due to CVE-2022-21363 
  
 
 
 
 
Documentation Please refer to https://github.com/advisories/GHSA-g76j-4cxx-23h9 
  
 
 
 
 
Testing UT test 
  
  
 

  
 
 
 
 

 
 
 

 
 
 Add Comment  
 

  
 

  
 
 
 
  
 

  
 
 
 
 

 
 This message was sent by Atlassian Jira (v8.20.10#820010-sha1:ace47f9)  
 
 

 
   
   

[jira] [Updated] (DRILL-8254) upgrade mysql-connectors-java to 8.0.28 due to CVE-2022-21363

2022-06-28 Thread Jingchuan Hu (Jira)
Title: Message Title


 
 
 
 

 
 
 

 
   
 Jingchuan Hu updated an issue  
 

  
 
 
 
 

 
 
  
 
 
 
 

 
 Apache Drill /  DRILL-8254  
 
 
  upgrade mysql-connectors-java to 8.0.28 due to CVE-2022-21363   
 

  
 
 
 
 

 
Change By: 
 Jingchuan Hu  
 
 
Summary: 
 Upgrade MySQL Connectors JAVA Due upgrade mysql-connectors-java  to  8.0.28 due to  CVE-2022-21363  
 

  
 
 
 
 

 
 
 

 
 
 Add Comment  
 

  
 

  
 
 
 
  
 

  
 
 
 
 

 
 This message was sent by Atlassian Jira (v8.20.10#820010-sha1:ace47f9)  
 
 

 
   
 

  
 

  
 

   



[jira] [Commented] (DRILL-8253) Support the limit results in kafka scan

2022-06-28 Thread ASF GitHub Bot (Jira)
Title: Message Title


 
 
 
 

 
 
 

 
   
 ASF GitHub Bot commented on  DRILL-8253  
 

  
 
 
 
 

 
 
  
 
 
 
 

 
  Re: Support the limit results in kafka scan   
 

  
 
 
 
 

 
 cgivre commented on code in PR #2580: URL: https://github.com/apache/drill/pull/2580#discussion_r908569894 ## contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java: ## @@ -262,6 +269,17 @@ public void applyAssignments(List incomingEndpoints)  { assignments = AssignmentCreator.getMappings(incomingEndpoints, Lists.newArrayList(partitionWorkMap.values())); } + @Override + public GroupScan applyLimit(int maxRecords)  { + records = maxRecords; // Just apply the limit value into sub-scan + return super.applyLimit(maxRecords); + } + Review Comment: I wouldn't recommend that. My understanding of how all this works is that Calcite/Drill create multiple copies of the GroupScan to try to find the optimal query plan. The GroupScans have to be immutable otherwise strange things will happen when you execute more complex queries, and in all likelihood, the limit won't be pushed down.   
 

  
 
 
 
 

 
 
 

 
 
 Add Comment  
 

  
 

  
 
 
 
  
 

  
 
 
 
 

 
 This message was sent by Atlassian Jira (v8.20.10#820010-sha1:ace47f9)  
 
 

 
   
 

  
 

  
 

   



[jira] [Created] (DRILL-8254) Upgrade MySQL Connectors JAVA Due to CVE-2022-21363

2022-06-28 Thread Jingchuan Hu (Jira)
Title: Message Title


 
 
 
 

 
 
 

 
   
 Jingchuan Hu created an issue  
 

  
 
 
 
 

 
 
  
 
 
 
 

 
 Apache Drill /  DRILL-8254  
 
 
  Upgrade MySQL Connectors JAVA Due to CVE-2022-21363   
 

  
 
 
 
 

 
Issue Type: 
  Bug  
 
 
Assignee: 
 Unassigned  
 
 
Created: 
 28/Jun/22 14:35  
 
 
Priority: 
  Major  
 
 
Reporter: 
 Jingchuan Hu  
 

  
 
 
 
 

 
 Upgrade MySQL Connectors JAVA to 8.0.28 due to CVE-2022-21363 https://github.com/advisories/GHSA-g76j-4cxx-23h9  
 

  
 
 
 
 

 
 
 

 
 
 Add Comment  
 

  
 

  
 
 
 
  
 

  
 
 
 
 

 
 This message was sent by Atlassian Jira (v8.20.10#820010-sha1:ace47f9)  
 
 

 
   
 

[jira] [Commented] (DRILL-8253) Support the limit results in kafka scan

2022-06-28 Thread ASF GitHub Bot (Jira)
Title: Message Title


 
 
 
 

 
 
 

 
   
 ASF GitHub Bot commented on  DRILL-8253  
 

  
 
 
 
 

 
 
  
 
 
 
 

 
  Re: Support the limit results in kafka scan   
 

  
 
 
 
 

 
 luocooong commented on code in PR #2580: URL: https://github.com/apache/drill/pull/2580#discussion_r908439009 ## contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java: ## @@ -359,7 +382,7 @@ public KafkaStoragePlugin getStoragePlugin() {  @Override public String toString() { 
 
return String.format("KafkaGroupScan [KafkaScanSpec=%s, columns=%s]", kafkaScanSpec, columns); + return String.format("KafkaGroupScan [KafkaScanSpec=%s, columns=%s, records=%d]", kafkaScanSpec, columns, records); 
 Review Comment: Good idea, thanks!  
 

  
 
 
 
 

 
 
 

 
 
 Add Comment  
 

  
 

  
 
 
 
  
 

  
 
 
 
 

 
 This message was sent by Atlassian Jira (v8.20.10#820010-sha1:ace47f9)  
 
 

 
   
 

  
 

  
 

   



[jira] [Commented] (DRILL-8253) Support the limit results in kafka scan

2022-06-28 Thread ASF GitHub Bot (Jira)
Title: Message Title


 
 
 
 

 
 
 

 
   
 ASF GitHub Bot commented on  DRILL-8253  
 

  
 
 
 
 

 
 
  
 
 
 
 

 
  Re: Support the limit results in kafka scan   
 

  
 
 
 
 

 
 luocooong commented on code in PR #2580: URL: https://github.com/apache/drill/pull/2580#discussion_r908438483 ## contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java: ## @@ -77,6 +77,7 @@ public class KafkaGroupScan extends AbstractGroupScan { private final KafkaScanSpec kafkaScanSpec;  private List columns; + private int records; Review Comment: Done.  
 

  
 
 
 
 

 
 
 

 
 
 Add Comment  
 

  
 

  
 
 
 
  
 

  
 
 
 
 

 
 This message was sent by Atlassian Jira (v8.20.10#820010-sha1:ace47f9)  
 
 

 
   
 

  
 

  
 

   



[jira] [Commented] (DRILL-8253) Support the limit results in kafka scan

2022-06-28 Thread ASF GitHub Bot (Jira)
Title: Message Title


 
 
 
 

 
 
 

 
   
 ASF GitHub Bot commented on  DRILL-8253  
 

  
 
 
 
 

 
 
  
 
 
 
 

 
  Re: Support the limit results in kafka scan   
 

  
 
 
 
 

 
 luocooong commented on code in PR #2580: URL: https://github.com/apache/drill/pull/2580#discussion_r908438074 ## contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java: ## @@ -262,6 +269,17 @@ public void applyAssignments(List incomingEndpoints)  { assignments = AssignmentCreator.getMappings(incomingEndpoints, Lists.newArrayList(partitionWorkMap.values())); } + @Override + public GroupScan applyLimit(int maxRecords)  { + records = maxRecords; // Just apply the limit value into sub-scan + return super.applyLimit(maxRecords); + } + Review Comment: Yes, I'm just call this parent interface that returns null (As long as the default behavior is not broken), because I just need to get the limit value from the execution plan.  
 

  
 
 
 
 

 
 
 

 
 
 Add Comment  
 

  
 

  
 
 
 
  
 

  
 
 
 
 

 
 This message was sent by Atlassian Jira (v8.20.10#820010-sha1:ace47f9)  
 
 

 
   
 

  
 

  
 

   



[jira] [Commented] (DRILL-8253) Support the limit results in kafka scan

2022-06-28 Thread ASF GitHub Bot (Jira)
Title: Message Title


 
 
 
 

 
 
 

 
   
 ASF GitHub Bot commented on  DRILL-8253  
 

  
 
 
 
 

 
 
  
 
 
 
 

 
  Re: Support the limit results in kafka scan   
 

  
 
 
 
 

 
 cgivre commented on code in PR #2580: URL: https://github.com/apache/drill/pull/2580#discussion_r908416571 ## contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java: ## @@ -262,6 +269,17 @@ public void applyAssignments(List incomingEndpoints)  { assignments = AssignmentCreator.getMappings(incomingEndpoints, Lists.newArrayList(partitionWorkMap.values())); } + @Override + public GroupScan applyLimit(int maxRecords)  { + records = maxRecords; // Just apply the limit value into sub-scan + return super.applyLimit(maxRecords); + } + Review Comment: I'm fairly certain this isn't actually working. The super method always returns `null`.   https://github.com/apache/drill/blob/53e6f2697b31cb76a38e75fe95283512715d28c3/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java#L174-L178  What I think you need to do here is: 1. Create a clone constructor in the `KafkaGroupScan` 2. Call that and return a new `KafkaGroupScan` with the limit applied. ## contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java: ## @@ -77,6 +77,7 @@ public class KafkaGroupScan extends AbstractGroupScan { private final KafkaScanSpec kafkaScanSpec;  private List columns; + private int records; Review Comment: These variables should all be `final`. ## contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java: ## @@ -359,7 +382,7 @@ public KafkaStoragePlugin getStoragePlugin() {  @Override public String toString() { 
 
return String.format("KafkaGroupScan [KafkaScanSpec=%s, columns=%s]", kafkaScanSpec, columns); + return String.format("KafkaGroupScan [KafkaScanSpec=%s, columns=%s, records=%d]", kafkaScanSpec, columns, records); 
 Review Comment: Could we use the `PlanStringBuilder` here?  
 

  
 
 
 
 

 
 
 

 
 
 Add Comment  
 

  
 

  
 
 
 
  
 

  
 
 
 
 

 
 This message was sent by Atlassian Jira (v

[jira] [Commented] (DRILL-8253) Support the limit results in kafka scan

2022-06-28 Thread ASF GitHub Bot (Jira)
Title: Message Title


 
 
 
 

 
 
 

 
   
 ASF GitHub Bot commented on  DRILL-8253  
 

  
 
 
 
 

 
 
  
 
 
 
 

 
  Re: Support the limit results in kafka scan   
 

  
 
 
 
 

 
 luocooong opened a new pull request, #2580: URL: https://github.com/apache/drill/pull/2580 
 
DRILL-8253(https://issues.apache.org/jira/browse/DRILL-8253): Support the limit results in kafka scan 
 
 
 
 
Description In the current implementation of the kafka storage, although we use the limit detection method in the result loader, but the actual `maxRecords` is always -1. 
  
  The mechanism for this revision is to get limit (SQL parsing from the execution plan stage) from the group scan, then pass it to the sub-scan, and finally process it in the result loader. 
 
 
 
Documentation N/A 
  
 
 
 
 
Testing Added the unit test. KafkaQueriesTest#testResultLimit() 
  
  
 

  
 
 
 
 

 
 
 

 
 
 Add Comment  
 

  
 

  
 
 
 
  
 

  
 
 
 
 

 
 This message was sent by Atlassian Jira (v8.20.10#820010-sha1:ace47f9)  
 
  

[jira] [Created] (DRILL-8253) Support the limit results in kafka scan

2022-06-28 Thread Cong Luo (Jira)
Title: Message Title


 
 
 
 

 
 
 

 
   
 Cong Luo created an issue  
 

  
 
 
 
 

 
 
  
 
 
 
 

 
 Apache Drill /  DRILL-8253  
 
 
  Support the limit results in kafka scan   
 

  
 
 
 
 

 
Issue Type: 
  Improvement  
 
 
Assignee: 
 Cong Luo  
 
 
Components: 
 Storage - Kafka  
 
 
Created: 
 28/Jun/22 10:42  
 
 
Fix Versions: 
 1.20.2  
 
 
Priority: 
  Major  
 
 
Reporter: 
 Cong Luo  
 

  
 
 
 
 

 
 In the current implementation of the kafka storage, although we use the limit detection method in the result loader, but the actual maxRecords is always -1.  
 

  
 
 
 
 

 
 
 

 
 
 Add Comment