This is an automated email from the ASF dual-hosted git repository. kezhenxu94 pushed a commit to branch ql in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 2a3954e5c2a11b2215edaf17c5e576e803719527 Author: kezhenxu94 <[email protected]> AuthorDate: Wed Sep 17 17:18:18 2025 +0800 ql --- bydbctl/internal/cmd/bydbql.go | 378 +++++++++++++++++ bydbctl/internal/cmd/bydbql_test.go | 345 ++++++++++++++++ bydbctl/internal/cmd/root.go | 2 +- pkg/bydbql/ast.go | 327 +++++++++++++++ pkg/bydbql/bydbql_suite_test.go | 30 ++ pkg/bydbql/bydbql_test.go | 465 +++++++++++++++++++++ pkg/bydbql/lexer.go | 608 +++++++++++++++++++++++++++ pkg/bydbql/parser.go | 796 ++++++++++++++++++++++++++++++++++++ pkg/bydbql/translator.go | 591 ++++++++++++++++++++++++++ 9 files changed, 3541 insertions(+), 1 deletion(-) diff --git a/bydbctl/internal/cmd/bydbql.go b/bydbctl/internal/cmd/bydbql.go new file mode 100644 index 00000000..67545eae --- /dev/null +++ b/bydbctl/internal/cmd/bydbql.go @@ -0,0 +1,378 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package cmd + +import ( + "bufio" + "encoding/json" + "fmt" + "io" + "os" + "strings" + + "github.com/go-resty/resty/v2" + "github.com/spf13/cobra" + "github.com/spf13/viper" + "sigs.k8s.io/yaml" + + "github.com/apache/skywalking-banyandb/pkg/bydbql" + "github.com/apache/skywalking-banyandb/pkg/version" +) + +// BydbQL endpoint paths +const ( + streamQueryPath = "/api/v1/stream/query" + measureQueryPath = "/api/v1/measure/query" + traceQueryPath = "/api/v1/trace/query" + propertyQueryPath = "/api/v1/property/query" + topnQueryPath = "/api/v1/measure/topn" +) + +// newBydbQLCmd creates the BydbQL command +func newBydbQLCmd() *cobra.Command { + bydbqlCmd := &cobra.Command{ + Use: "bydbql", + Version: version.Build(), + Short: "BanyanDB Query Language (BydbQL) interface", + Long: `BanyanDB Query Language (BydbQL) provides a SQL-like interface for querying BanyanDB. +It supports querying streams, measures, traces, properties, and top-N operations with familiar SQL syntax. + +Examples: + # Interactive mode + bydbctl bydbql + + # Execute query from file + bydbctl bydbql query -f query.sql + + # Execute query from stdin + echo "SELECT * FROM STREAM sw WHERE service_id = 'webapp'" | bydbctl bydbql query -f -`, + } + + // Interactive command + interactiveCmd := &cobra.Command{ + Use: "interactive", + Aliases: []string{"i"}, + Version: version.Build(), + Short: "Start interactive BydbQL shell", + RunE: func(cmd *cobra.Command, _ []string) error { + return runInteractiveMode(cmd) + }, + } + + // Query command + queryCmd := &cobra.Command{ + Use: "query [-f file]", + Version: version.Build(), + Short: "Execute BydbQL query from file or stdin", + Long: `Execute BydbQL query from a file or stdin. The query will be parsed and translated to YAML format, +then sent to the appropriate BanyanDB endpoint. + +Supported query types: + - SELECT queries for streams, measures, traces, and properties + - SHOW TOP N queries for top-N operations + +Examples: + bydbctl bydbql query -f query.sql + echo "SELECT * FROM STREAM sw" | bydbctl bydbql query -f -`, + RunE: func(cmd *cobra.Command, _ []string) error { + return runQueryMode(cmd) + }, + } + + bindFileFlag(queryCmd) + bindTLSRelatedFlag(interactiveCmd, queryCmd) + + bydbqlCmd.AddCommand(interactiveCmd, queryCmd) + + // Default to interactive mode if no subcommand specified + bydbqlCmd.RunE = func(cmd *cobra.Command, _ []string) error { + return runInteractiveMode(cmd) + } + + return bydbqlCmd +} + +// runInteractiveMode starts the interactive BydbQL shell +func runInteractiveMode(cmd *cobra.Command) error { + fmt.Println("BanyanDB Query Language (BydbQL) Interactive Shell") + fmt.Println("Type 'help' for help, 'exit' or 'quit' to quit.") + fmt.Println() + + scanner := bufio.NewScanner(os.Stdin) + + for { + fmt.Print("bydbql> ") + + if !scanner.Scan() { + break + } + + line := strings.TrimSpace(scanner.Text()) + + if line == "" { + continue + } + + // Handle special commands + switch strings.ToLower(line) { + case "help": + printHelp() + continue + case "exit", "quit": + fmt.Println("Goodbye!") + return nil + case "\\q": + fmt.Println("Goodbye!") + return nil + } + + // Check for multi-line queries (ending with semicolon) + query := line + for !strings.HasSuffix(strings.TrimSpace(query), ";") { + fmt.Print(" -> ") + if !scanner.Scan() { + break + } + query += " " + scanner.Text() + } + + // Remove trailing semicolon + query = strings.TrimSuffix(strings.TrimSpace(query), ";") + + // Execute the query + if err := executeBydbQLQuery(query); err != nil { + fmt.Printf("Error: %v\n", err) + } + fmt.Println() + } + + if err := scanner.Err(); err != nil { + return fmt.Errorf("error reading input: %w", err) + } + + return nil +} + +// runQueryMode executes BydbQL query from file or stdin +func runQueryMode(cmd *cobra.Command) error { + var reader io.Reader + + if filePath == "-" || filePath == "" { + reader = cmd.InOrStdin() + } else { + file, err := os.Open(filePath) + if err != nil { + return fmt.Errorf("error opening file %s: %w", filePath, err) + } + defer file.Close() + reader = file + } + + // Read the entire query + content, err := io.ReadAll(reader) + if err != nil { + return fmt.Errorf("error reading query: %w", err) + } + + query := strings.TrimSpace(string(content)) + query = strings.TrimSuffix(query, ";") + + if query == "" { + return fmt.Errorf("empty query") + } + + return executeBydbQLQuery(query) +} + +// executeBydbQLQuery parses and executes a BydbQL query +func executeBydbQLQuery(query string) error { + // Create query context + context := &bydbql.QueryContext{ + DefaultGroup: viper.GetString("group"), + DefaultResourceType: bydbql.ResourceTypeAuto, + } + + // Parse the query + parsed, parseErrors := bydbql.ParseQuery(query) + if len(parseErrors) > 0 { + return fmt.Errorf("parsing errors:\n %s", strings.Join(parseErrors, "\n ")) + } + + if parsed == nil { + return fmt.Errorf("failed to parse query") + } + + // Merge context + if context.DefaultGroup != "" && len(parsed.Groups) == 0 { + parsed.Groups = []string{context.DefaultGroup} + } + + // Translate to YAML + translator := bydbql.NewTranslator(context) + yamlData, err := translator.TranslateToMap(parsed) + if err != nil { + return fmt.Errorf("translation error: %w", err) + } + fmt.Printf("Translated YAML:\n%s\n", yamlData) + + // Convert to JSON for REST API + jsonData, err := json.Marshal(yamlData) + if err != nil { + return fmt.Errorf("JSON conversion error: %w", err) + } + + // Determine endpoint based on query type + endpoint, err := determineEndpoint(parsed) + if err != nil { + return err + } + + // Execute the query via REST API + return executeRESTQuery(endpoint, jsonData) +} + +// determineEndpoint determines the REST endpoint based on query type +func determineEndpoint(parsed *bydbql.ParsedQuery) (string, error) { + switch parsed.Statement.(type) { + case *bydbql.SelectStatement: + // Determine resource type + resourceType := parsed.ResourceType + if resourceType == bydbql.ResourceTypeAuto { + // Default to stream if not specified + resourceType = bydbql.ResourceTypeStream + } + + switch resourceType { + case bydbql.ResourceTypeStream: + return streamQueryPath, nil + case bydbql.ResourceTypeMeasure: + return measureQueryPath, nil + case bydbql.ResourceTypeTrace: + return traceQueryPath, nil + case bydbql.ResourceTypeProperty: + return propertyQueryPath, nil + default: + return "", fmt.Errorf("unsupported resource type: %s", resourceType.String()) + } + + case *bydbql.TopNStatement: + return topnQueryPath, nil + + default: + return "", fmt.Errorf("unsupported statement type") + } +} + +// executeRESTQuery executes the query via REST API +func executeRESTQuery(endpoint string, jsonData []byte) error { + // Create REST request using existing infrastructure + reqBodyData := reqBody{ + data: jsonData, + } + + // Use existing rest function with appropriate parameters + return rest( + func() ([]reqBody, error) { + return []reqBody{reqBodyData}, nil + }, + func(req request) (*resty.Response, error) { + url := viper.GetString("addr") + endpoint + resp, err := req.req. + SetHeader("Content-Type", "application/json"). + SetBody(req.data). + Post(url) + return resp, err + }, + yamlPrinter, + enableTLS, + insecure, + cert, + ) +} + +// printHelp prints help information for the interactive mode +func printHelp() { + fmt.Println("BanyanDB Query Language (BydbQL) Help") + fmt.Println("=====================================") + fmt.Println() + fmt.Println("BydbQL supports SQL-like syntax for querying BanyanDB resources:") + fmt.Println() + fmt.Println("STREAM QUERIES:") + fmt.Println(" SELECT * FROM STREAM sw WHERE service_id = 'webapp';") + fmt.Println(" SELECT trace_id, service_id FROM STREAM sw TIME > '-30m';") + fmt.Println(" SELECT * FROM STREAM sw TIME BETWEEN '2023-01-01T00:00:00Z' AND '2023-01-02T00:00:00Z';") + fmt.Println() + fmt.Println("MEASURE QUERIES:") + fmt.Println(" SELECT region, SUM(latency) FROM MEASURE service_cpm GROUP BY region;") + fmt.Println(" SELECT TOP 10 instance, cpu_usage FROM MEASURE instance_metrics ORDER BY cpu_usage DESC;") + fmt.Println(" SELECT AVG(response_time) FROM MEASURE http_metrics WHERE region = 'us-west';") + fmt.Println() + fmt.Println("TRACE QUERIES:") + fmt.Println(" SELECT trace_id, service_id FROM TRACE sw_trace WHERE status = 'error';") + fmt.Println(" SELECT () FROM TRACE sw_trace WHERE service_id = 'webapp' LIMIT 100;") + fmt.Println() + fmt.Println("PROPERTY QUERIES:") + fmt.Println(" SELECT ip, owner FROM PROPERTY server_metadata WHERE datacenter = 'dc-101';") + fmt.Println(" SELECT * FROM PROPERTY server_metadata WHERE ID = 'server-123';") + fmt.Println() + fmt.Println("TOP-N QUERIES:") + fmt.Println(" SHOW TOP 10 FROM MEASURE service_latency ORDER BY value DESC;") + fmt.Println(" SHOW TOP 5 FROM MEASURE service_errors WHERE status_code = '500';") + fmt.Println() + fmt.Println("TIME FORMATS:") + fmt.Println(" Absolute: '2023-01-01T00:00:00Z' (RFC3339)") + fmt.Println(" Relative: '-30m', '2h', '-1d', '-1w', 'now'") + fmt.Println() + fmt.Println("OPERATORS:") + fmt.Println(" =, !=, >, <, >=, <=, IN, NOT IN, HAVING, NOT HAVING, MATCH") + fmt.Println(" AND, OR for combining conditions") + fmt.Println() + fmt.Println("SPECIAL COMMANDS:") + fmt.Println(" help - Show this help") + fmt.Println(" exit - Exit the shell") + fmt.Println(" quit - Exit the shell") + fmt.Println(" \\q - Exit the shell") + fmt.Println() + fmt.Println("QUERY TRACING:") + fmt.Println(" Add 'WITH QUERY_TRACE' to any query for distributed tracing") + fmt.Println() + fmt.Println("NOTE: Queries can span multiple lines and should end with semicolon (;)") +} + +// printQueryResult prints the query result in YAML format +func printQueryResult(data []byte) error { + // The data is already in JSON format from the REST response + // Convert it to YAML for better readability + var jsonData interface{} + if err := json.Unmarshal(data, &jsonData); err != nil { + // If JSON parsing fails, print raw data + fmt.Println(string(data)) + return nil + } + + yamlData, err := yaml.Marshal(jsonData) + if err != nil { + // If YAML conversion fails, print JSON data + fmt.Println(string(data)) + return nil + } + + fmt.Print(string(yamlData)) + return nil +} + diff --git a/bydbctl/internal/cmd/bydbql_test.go b/bydbctl/internal/cmd/bydbql_test.go new file mode 100644 index 00000000..1b13e792 --- /dev/null +++ b/bydbctl/internal/cmd/bydbql_test.go @@ -0,0 +1,345 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package cmd + +import ( + "bytes" + "strings" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/spf13/cobra" + "github.com/spf13/viper" + "github.com/zenizh/go-capturer" + "sigs.k8s.io/yaml" + + "github.com/apache/skywalking-banyandb/pkg/bydbql" + "github.com/apache/skywalking-banyandb/pkg/test/flags" + "github.com/apache/skywalking-banyandb/pkg/test/setup" +) + +const httpSchema = "http://" + +var _ = Describe("BydbQL Command", func() { + var addr string + var deferFunc func() + var rootCmd *cobra.Command + + BeforeEach(func() { + _, addr, deferFunc = setup.EmptyStandalone() + addr = httpSchema + addr + + // Reset viper for clean test state + viper.Reset() + viper.Set("addr", addr) + viper.Set("group", "default") + + // Create root command with all subcommands + rootCmd = &cobra.Command{Use: "root"} + RootCmdFlags(rootCmd) + }) + + AfterEach(func() { + deferFunc() + }) + + It("creates command with correct structure", func() { + // Find bydbql command in root commands + var bydbqlCmd *cobra.Command + for _, cmd := range rootCmd.Commands() { + if cmd.Use == "bydbql" { + bydbqlCmd = cmd + break + } + } + + Expect(bydbqlCmd).NotTo(BeNil()) + Expect(bydbqlCmd.Use).To(Equal("bydbql")) + + // Test subcommands + subCommands := bydbqlCmd.Commands() + Expect(subCommands).To(HaveLen(2)) + + // Check that the expected subcommands exist (order may vary) + subCommandNames := make([]string, len(subCommands)) + for i, cmd := range subCommands { + subCommandNames[i] = cmd.Use + } + Expect(subCommandNames).To(ContainElements("interactive", "query")) + }) +}) + +var _ = Describe("Endpoint Determination", func() { + DescribeTable("determines correct endpoint for different query types", + func(query, expectedEndpoint string) { + parsed, errors := bydbql.ParseQuery(query) + Expect(errors).To(BeEmpty()) + + endpoint, err := determineEndpoint(parsed) + Expect(err).NotTo(HaveOccurred()) + Expect(endpoint).To(Equal(expectedEndpoint)) + }, + Entry("stream query", "SELECT * FROM STREAM sw", streamQueryPath), + Entry("measure query", "SELECT * FROM MEASURE metrics", measureQueryPath), + Entry("trace query", "SELECT * FROM TRACE traces", traceQueryPath), + Entry("property query", "SELECT * FROM PROPERTY props", propertyQueryPath), + Entry("top-n query", "SHOW TOP 10 FROM MEASURE metrics", topnQueryPath), + ) +}) + +var _ = Describe("Query Execution Parse Errors", func() { + DescribeTable("handles parsing errors correctly", + func(query string) { + err := executeBydbQLQuery(query) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("parsing errors")) + }, + Entry("incomplete SELECT", "SELECT"), + Entry("missing resource", "SELECT * FROM"), + Entry("missing N in TOP", "SHOW TOP FROM MEASURE metrics"), + ) +}) + +var _ = Describe("Valid Query Parsing", func() { + DescribeTable("parses valid queries correctly", + func(query string) { + // Parse and validate query structure + parsed, errors := bydbql.ParseQuery(query) + Expect(errors).To(BeEmpty()) + Expect(parsed).NotTo(BeNil()) + + // Validate endpoint determination + _, err := determineEndpoint(parsed) + Expect(err).NotTo(HaveOccurred()) + }, + Entry("stream query", "SELECT * FROM STREAM sw"), + Entry("measure query with aggregation", "SELECT region, SUM(latency) FROM MEASURE metrics GROUP BY region"), + Entry("top-n query", "SHOW TOP 10 FROM MEASURE service_latency ORDER BY value DESC"), + Entry("trace query with condition", "SELECT * FROM TRACE traces WHERE status = 'error'"), + Entry("property query with condition", "SELECT ip, owner FROM PROPERTY metadata WHERE datacenter = 'dc-1'"), + ) +}) + +var _ = Describe("Help Command", func() { + var addr string + var deferFunc func() + var rootCmd *cobra.Command + + BeforeEach(func() { + _, addr, deferFunc = setup.EmptyStandalone() + addr = httpSchema + addr + + // Reset viper for clean test state + viper.Reset() + viper.Set("addr", addr) + viper.Set("group", "default") + + // Create root command with all subcommands + rootCmd = &cobra.Command{Use: "root"} + RootCmdFlags(rootCmd) + }) + + AfterEach(func() { + deferFunc() + }) + + It("displays help information correctly", func() { + // Capture help output + var output bytes.Buffer + rootCmd.SetOut(&output) + + // Execute help for bydbql command + rootCmd.SetArgs([]string{"bydbql", "--help"}) + err := rootCmd.Execute() + Expect(err).NotTo(HaveOccurred()) + + helpOutput := output.String() + + // Verify help content includes key information + expectedContent := []string{ + "BanyanDB Query Language", + "BydbQL", + "interactive", + "query", + "SQL-like", + } + + for _, expected := range expectedContent { + Expect(helpOutput).To(ContainSubstring(expected)) + } + }) +}) + +var _ = Describe("BydbQL Integration", func() { + DescribeTable("full pipeline processing", + func(query string, resourceType bydbql.ResourceType) { + // Step 1: Parse query + parsed, errors := bydbql.ParseQuery(query) + Expect(errors).To(BeEmpty()) + Expect(parsed).NotTo(BeNil()) + + // Step 2: Validate resource type detection + if parsed.ResourceType != resourceType { + // For auto-detected resource types, this might be expected + GinkgoWriter.Printf("Resource type: expected %s, got %s (may be auto-detected)", + resourceType.String(), parsed.ResourceType.String()) + } + + // Step 3: Translate to YAML + context := &bydbql.QueryContext{ + DefaultGroup: "default", + } + translator := bydbql.NewTranslator(context) + yamlData, err := translator.TranslateToYAML(parsed) + Expect(err).NotTo(HaveOccurred()) + Expect(yamlData).NotTo(BeEmpty()) + + // Step 4: Determine correct endpoint + endpoint, err := determineEndpoint(parsed) + Expect(err).NotTo(HaveOccurred()) + + // Step 5: Validate endpoint matches expected resource type + expectedEndpoints := map[bydbql.ResourceType]string{ + bydbql.ResourceTypeStream: streamQueryPath, + bydbql.ResourceTypeMeasure: measureQueryPath, + bydbql.ResourceTypeTrace: traceQueryPath, + bydbql.ResourceTypeProperty: propertyQueryPath, + } + + // For Top-N queries, always use measure endpoint + if _, isTopN := parsed.Statement.(*bydbql.TopNStatement); isTopN { + Expect(endpoint).To(Equal(topnQueryPath)) + } else if expectedEndpoint, ok := expectedEndpoints[resourceType]; ok { + Expect(endpoint).To(Equal(expectedEndpoint)) + } + + // Step 6: Validate YAML structure + var yamlMap map[string]interface{} + err = yaml.Unmarshal(yamlData, &yamlMap) + Expect(err).NotTo(HaveOccurred()) + + // Basic validation: should have name or resource identifier + Expect(yamlMap["Name"] != nil || yamlMap["topN"] != nil).To(BeTrue()) + + GinkgoWriter.Printf("Successfully processed query: %s", query) + GinkgoWriter.Printf("Generated YAML length: %d bytes", len(yamlData)) + GinkgoWriter.Printf("Target endpoint: %s", endpoint) + }, + Entry("stream query full pipeline", + "SELECT trace_id, service_id FROM STREAM sw WHERE service_id = 'webapp' TIME > '-30m' LIMIT 100", + bydbql.ResourceTypeStream), + Entry("measure query full pipeline", + "SELECT region, AVG(latency) FROM MEASURE service_metrics WHERE service = 'auth' GROUP BY region", + bydbql.ResourceTypeMeasure), + Entry("topn query full pipeline", + "SHOW TOP 5 FROM MEASURE error_count WHERE status_code = '500'", + bydbql.ResourceTypeMeasure), + Entry("trace query full pipeline", + "SELECT * FROM TRACE app_traces WHERE operation_name = 'GET /api/users' TIME BETWEEN '-1h' AND 'now'", + bydbql.ResourceTypeTrace), + Entry("property query full pipeline", + "SELECT ip, region FROM PROPERTY server_info WHERE ID = 'server-1'", + bydbql.ResourceTypeProperty), + ) +}) + +var _ = Describe("Command Line Flags", func() { + var addr string + var deferFunc func() + var rootCmd *cobra.Command + + BeforeEach(func() { + _, addr, deferFunc = setup.EmptyStandalone() + addr = httpSchema + addr + + // Reset viper for clean test state + viper.Reset() + viper.Set("addr", addr) + viper.Set("group", "default") + + // Create root command with all subcommands + rootCmd = &cobra.Command{Use: "root"} + RootCmdFlags(rootCmd) + }) + + AfterEach(func() { + deferFunc() + }) + + DescribeTable("handles command line flags correctly", + func(args []string, expectError bool) { + // Prepend "bydbql" to the args since we're testing the bydbql subcommand + fullArgs := append([]string{"bydbql"}, args...) + rootCmd.SetArgs(fullArgs) + + // We're testing command structure, not execution + // So we expect parsing to succeed even if execution might fail + err := rootCmd.ParseFlags(fullArgs) + + if expectError { + Expect(err).To(HaveOccurred()) + } else { + Expect(err).NotTo(HaveOccurred()) + } + }, + Entry("valid query command with file", []string{"query", "-f", "test.sql"}, false), + Entry("valid interactive command", []string{"interactive"}, false), + Entry("alias i for interactive", []string{"i"}, false), + ) +}) + +var _ = Describe("BydbQL Query Execution", func() { + var addr string + var deferFunc func() + var rootCmd *cobra.Command + + BeforeEach(func() { + _, addr, deferFunc = setup.EmptyStandalone() + addr = httpSchema + addr + + // Reset viper for clean test state + viper.Reset() + viper.Set("addr", addr) + viper.Set("group", "default") + + // Create root command with all subcommands + rootCmd = &cobra.Command{Use: "root"} + RootCmdFlags(rootCmd) + }) + + AfterEach(func() { + deferFunc() + }) + + It("executes simple query successfully", func() { + // Test a simple query execution + rootCmd.SetArgs([]string{"bydbql", "query", "-f", "-"}) + rootCmd.SetIn(strings.NewReader("SELECT * FROM STREAM sw LIMIT 1")) + + executeQuery := func() string { + return capturer.CaptureStdout(func() { + err := rootCmd.Execute() + if err != nil { + GinkgoWriter.Printf("execution fails: %v", err) + } + }) + } + + Eventually(executeQuery, flags.EventuallyTimeout).ShouldNot(ContainSubstring("error")) + }) +}) diff --git a/bydbctl/internal/cmd/root.go b/bydbctl/internal/cmd/root.go index c83f2752..095a995d 100644 --- a/bydbctl/internal/cmd/root.go +++ b/bydbctl/internal/cmd/root.go @@ -80,7 +80,7 @@ func RootCmdFlags(command *cobra.Command) { _ = viper.BindPFlag("password", command.PersistentFlags().Lookup("password")) command.AddCommand(newGroupCmd(), newUseCmd(), newStreamCmd(), newMeasureCmd(), newTopnCmd(), - newIndexRuleCmd(), newIndexRuleBindingCmd(), newPropertyCmd(), newTraceCmd(), newHealthCheckCmd(), newAnalyzeCmd()) + newIndexRuleCmd(), newIndexRuleBindingCmd(), newPropertyCmd(), newTraceCmd(), newHealthCheckCmd(), newAnalyzeCmd(), newBydbQLCmd()) } func init() { diff --git a/pkg/bydbql/ast.go b/pkg/bydbql/ast.go new file mode 100644 index 00000000..0348d1e1 --- /dev/null +++ b/pkg/bydbql/ast.go @@ -0,0 +1,327 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package bydbql provides BanyanDB Query Language (BydbQL) parsing and translation capabilities. +package bydbql + +import "time" + +// Node represents a node in the Abstract Syntax Tree +type Node interface { + String() string +} + +// Statement represents a BydbQL statement +type Statement interface { + Node + statementNode() +} + +// Expression represents a BydbQL expression +type Expression interface { + Node + expressionNode() +} + +// SelectStatement represents a SELECT statement +type SelectStatement struct { + Projection *Projection + From *FromClause + Time *TimeCondition + Where *WhereClause + GroupBy *GroupByClause + OrderBy *OrderByClause + Limit *int + Offset *int + QueryTrace bool +} + +func (s *SelectStatement) statementNode() {} +func (s *SelectStatement) String() string { return "SELECT" } + +// TopNStatement represents a SHOW TOP N statement +type TopNStatement struct { + TopN int + From *FromClause + Time *TimeCondition + Where *WhereClause + AggregateBy *AggregateFunction + OrderBy *OrderByClause + QueryTrace bool +} + +func (s *TopNStatement) statementNode() {} +func (s *TopNStatement) String() string { return "SHOW TOP" } + +// Projection represents the SELECT projection clause +type Projection struct { + All bool + Empty bool // SELECT () for traces + Columns []*Column + TopN *TopNProjection +} + +func (p *Projection) expressionNode() {} +func (p *Projection) String() string { return "projection" } + +// TopNProjection represents TOP N projection +type TopNProjection struct { + N int + Projection *Projection +} + +func (p *TopNProjection) expressionNode() {} +func (p *TopNProjection) String() string { return "TOP N" } + +// Column represents a column in projection +type Column struct { + Name string + Type ColumnType // ::tag or ::field disambiguator + Function *AggregateFunction +} + +func (c *Column) expressionNode() {} +func (c *Column) String() string { return c.Name } + +// ColumnType represents the type disambiguator for columns +type ColumnType int + +const ( + ColumnTypeAuto ColumnType = iota + ColumnTypeTag + ColumnTypeField +) + +// AggregateFunction represents aggregate functions (SUM, MEAN, COUNT, MAX, MIN) +type AggregateFunction struct { + Function string // SUM, MEAN, COUNT, MAX, MIN + Column string +} + +func (f *AggregateFunction) expressionNode() {} +func (f *AggregateFunction) String() string { return f.Function } + +// FromClause represents the FROM clause +type FromClause struct { + ResourceType ResourceType + ResourceName string + Groups []string +} + +func (f *FromClause) expressionNode() {} +func (f *FromClause) String() string { return "FROM" } + +// ResourceType represents the type of resource being queried +type ResourceType int + +const ( + ResourceTypeAuto ResourceType = iota + ResourceTypeStream + ResourceTypeMeasure + ResourceTypeTrace + ResourceTypeProperty +) + +func (r ResourceType) String() string { + switch r { + case ResourceTypeStream: + return "STREAM" + case ResourceTypeMeasure: + return "MEASURE" + case ResourceTypeTrace: + return "TRACE" + case ResourceTypeProperty: + return "PROPERTY" + default: + return "AUTO" + } +} + +// TimeCondition represents TIME conditions +type TimeCondition struct { + Operator TimeOperator + Timestamp string + Begin string // for BETWEEN + End string // for BETWEEN +} + +func (t *TimeCondition) expressionNode() {} +func (t *TimeCondition) String() string { return "TIME" } + +// TimeOperator represents time comparison operators +type TimeOperator int + +const ( + TimeOpEqual TimeOperator = iota + TimeOpGreater + TimeOpLess + TimeOpGreaterEqual + TimeOpLessEqual + TimeOpBetween +) + +// WhereClause represents the WHERE clause +type WhereClause struct { + Conditions []*Condition + Logic LogicOperator +} + +func (w *WhereClause) expressionNode() {} +func (w *WhereClause) String() string { return "WHERE" } + +// Condition represents a single condition in WHERE clause +type Condition struct { + Left string + Operator BinaryOperator + Right *Value + Values []*Value // for IN, NOT IN + Logic LogicOperator // for combining conditions +} + +func (c *Condition) expressionNode() {} +func (c *Condition) String() string { return c.Left } + +// BinaryOperator represents binary operators +type BinaryOperator int + +const ( + OpEqual BinaryOperator = iota + OpNotEqual + OpGreater + OpLess + OpGreaterEqual + OpLessEqual + OpIn + OpNotIn + OpHaving + OpNotHaving + OpMatch +) + +func (op BinaryOperator) String() string { + switch op { + case OpEqual: + return "=" + case OpNotEqual: + return "!=" + case OpGreater: + return ">" + case OpLess: + return "<" + case OpGreaterEqual: + return ">=" + case OpLessEqual: + return "<=" + case OpIn: + return "IN" + case OpNotIn: + return "NOT IN" + case OpHaving: + return "HAVING" + case OpNotHaving: + return "NOT HAVING" + case OpMatch: + return "MATCH" + default: + return "UNKNOWN" + } +} + +// LogicOperator represents logical operators (AND, OR) +type LogicOperator int + +const ( + LogicAnd LogicOperator = iota + LogicOr +) + +func (op LogicOperator) String() string { + switch op { + case LogicAnd: + return "AND" + case LogicOr: + return "OR" + default: + return "UNKNOWN" + } +} + +// Value represents a value in conditions +type Value struct { + Type ValueType + StringVal string + Integer int64 + IsNull bool +} + +func (v *Value) expressionNode() {} +func (v *Value) String() string { + if v.IsNull { + return "NULL" + } + switch v.Type { + case ValueTypeString: + return v.StringVal + case ValueTypeInteger: + return string(rune(v.Integer)) + default: + return "" + } +} + +// ValueType represents the type of a value +type ValueType int + +const ( + ValueTypeString ValueType = iota + ValueTypeInteger + ValueTypeNull +) + +// GroupByClause represents GROUP BY clause +type GroupByClause struct { + Columns []string +} + +func (g *GroupByClause) expressionNode() {} +func (g *GroupByClause) String() string { return "GROUP BY" } + +// OrderByClause represents ORDER BY clause +type OrderByClause struct { + Column string + Desc bool +} + +func (o *OrderByClause) expressionNode() {} +func (o *OrderByClause) String() string { return "ORDER BY" } + +// ParsedQuery represents a parsed BydbQL query with metadata +type ParsedQuery struct { + Statement Statement + ResourceType ResourceType + ResourceName string + Groups []string + Context *QueryContext +} + +// QueryContext provides execution context for queries +type QueryContext struct { + DefaultGroup string + DefaultResourceName string + DefaultResourceType ResourceType + CurrentTime time.Time +} \ No newline at end of file diff --git a/pkg/bydbql/bydbql_suite_test.go b/pkg/bydbql/bydbql_suite_test.go new file mode 100644 index 00000000..623ecab8 --- /dev/null +++ b/pkg/bydbql/bydbql_suite_test.go @@ -0,0 +1,30 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package bydbql_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestBydbql(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Bydbql Suite") +} \ No newline at end of file diff --git a/pkg/bydbql/bydbql_test.go b/pkg/bydbql/bydbql_test.go new file mode 100644 index 00000000..57e8d8f8 --- /dev/null +++ b/pkg/bydbql/bydbql_test.go @@ -0,0 +1,465 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package bydbql_test + +import ( + "fmt" + "testing" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "sigs.k8s.io/yaml" + + . "github.com/apache/skywalking-banyandb/pkg/bydbql" +) + +var _ = Describe("Lexer", func() { + DescribeTable("tokenizes queries correctly", + func(input string, expected []TokenType) { + lexer := NewLexer(input) + tokens := lexer.GetAllTokens() + + Expect(tokens).To(HaveLen(len(expected))) + + for i, expectedToken := range expected { + Expect(tokens[i].Type).To(Equal(expectedToken), + "token %d: expected %s, got %s", i, expectedToken.String(), tokens[i].Type.String()) + } + }, + Entry("basic SELECT query", + "SELECT * FROM STREAM sw", + []TokenType{TokenSelect, TokenStar, TokenFrom, TokenStream, TokenIdentifier, TokenEOF}), + Entry("SELECT with WHERE clause", + "SELECT trace_id FROM STREAM sw WHERE service_id = 'webapp'", + []TokenType{ + TokenSelect, TokenIdentifier, TokenFrom, TokenStream, TokenIdentifier, + TokenWhere, TokenIdentifier, TokenEqual, TokenString, TokenEOF, + }), + Entry("TOP N query", + "SHOW TOP 10 FROM MEASURE service_latency", + []TokenType{TokenShow, TokenTop, TokenInteger, TokenFrom, TokenMeasure, TokenIdentifier, TokenEOF}), + Entry("aggregate function", + "SELECT SUM(latency) FROM MEASURE metrics", + []TokenType{ + TokenSelect, TokenSum, TokenLeftParen, TokenIdentifier, TokenRightParen, + TokenFrom, TokenMeasure, TokenIdentifier, TokenEOF, + }), + Entry("time range", + "TIME BETWEEN '2023-01-01T00:00:00Z' AND '2023-01-02T00:00:00Z'", + []TokenType{TokenTime, TokenBetween, TokenString, TokenAnd, TokenString, TokenEOF}), + Entry("column type disambiguator", + "SELECT status::tag, status::field FROM MEASURE metrics", + []TokenType{ + TokenSelect, TokenIdentifier, TokenDoubleColon, TokenIdentifier, + TokenComma, TokenIdentifier, TokenDoubleColon, TokenIdentifier, + TokenFrom, TokenMeasure, TokenIdentifier, TokenEOF, + }), + ) +}) + +var _ = Describe("Parser", func() { + Describe("valid queries", func() { + It("parses simple SELECT *", func() { + parsed, errors := ParseQuery("SELECT *") + Expect(errors).To(BeEmpty()) + Expect(parsed).NotTo(BeNil()) + + stmt, ok := parsed.Statement.(*SelectStatement) + Expect(ok).To(BeTrue()) + Expect(stmt.Projection).NotTo(BeNil()) + Expect(stmt.Projection.All).To(BeTrue()) + }) + + It("parses SELECT with FROM clause", func() { + parsed, errors := ParseQuery("SELECT * FROM STREAM sw") + Expect(errors).To(BeEmpty()) + Expect(parsed).NotTo(BeNil()) + + stmt, ok := parsed.Statement.(*SelectStatement) + Expect(ok).To(BeTrue()) + Expect(stmt.From).NotTo(BeNil()) + Expect(stmt.From.ResourceType).To(Equal(ResourceTypeStream)) + Expect(stmt.From.ResourceName).To(Equal("sw")) + }) + + It("parses SELECT with WHERE clause", func() { + parsed, errors := ParseQuery("SELECT trace_id FROM STREAM sw WHERE service_id = 'webapp'") + Expect(errors).To(BeEmpty()) + Expect(parsed).NotTo(BeNil()) + + stmt, ok := parsed.Statement.(*SelectStatement) + Expect(ok).To(BeTrue()) + Expect(stmt.Where).NotTo(BeNil()) + Expect(stmt.Where.Conditions).To(HaveLen(1)) + Expect(stmt.Where.Conditions[0].Left).To(Equal("service_id")) + }) + + It("parses SELECT with TIME condition", func() { + parsed, errors := ParseQuery("SELECT * FROM STREAM sw TIME > '-30m'") + Expect(errors).To(BeEmpty()) + Expect(parsed).NotTo(BeNil()) + + stmt, ok := parsed.Statement.(*SelectStatement) + Expect(ok).To(BeTrue()) + Expect(stmt.Time).NotTo(BeNil()) + Expect(stmt.Time.Operator).To(Equal(TimeOpGreater)) + }) + + It("parses SELECT with GROUP BY", func() { + parsed, errors := ParseQuery("SELECT region, SUM(latency) FROM MEASURE metrics GROUP BY region") + Expect(errors).To(BeEmpty()) + Expect(parsed).NotTo(BeNil()) + + stmt, ok := parsed.Statement.(*SelectStatement) + Expect(ok).To(BeTrue()) + Expect(stmt.GroupBy).NotTo(BeNil()) + Expect(stmt.GroupBy.Columns).To(HaveLen(1)) + Expect(stmt.GroupBy.Columns[0]).To(Equal("region")) + }) + + It("parses TOP N statement", func() { + parsed, errors := ParseQuery("SHOW TOP 10 FROM MEASURE service_latency ORDER BY value DESC") + Expect(errors).To(BeEmpty()) + Expect(parsed).NotTo(BeNil()) + + stmt, ok := parsed.Statement.(*TopNStatement) + Expect(ok).To(BeTrue()) + Expect(stmt.TopN).To(Equal(10)) + Expect(stmt.OrderBy).NotTo(BeNil()) + Expect(stmt.OrderBy.Desc).To(BeTrue()) + }) + + It("parses empty projection for traces", func() { + parsed, errors := ParseQuery("SELECT () FROM TRACE sw_trace") + Expect(errors).To(BeEmpty()) + Expect(parsed).NotTo(BeNil()) + + stmt, ok := parsed.Statement.(*SelectStatement) + Expect(ok).To(BeTrue()) + Expect(stmt.Projection).NotTo(BeNil()) + Expect(stmt.Projection.Empty).To(BeTrue()) + }) + + It("parses WITH QUERY_TRACE", func() { + parsed, errors := ParseQuery("SELECT * FROM STREAM sw WITH QUERY_TRACE") + Expect(errors).To(BeEmpty()) + Expect(parsed).NotTo(BeNil()) + + stmt, ok := parsed.Statement.(*SelectStatement) + Expect(ok).To(BeTrue()) + Expect(stmt.QueryTrace).To(BeTrue()) + }) + }) + + Describe("invalid queries", func() { + It("handles invalid syntax", func() { + parsed, errors := ParseQuery("SELECT FROM") + Expect(errors).NotTo(BeEmpty()) + Expect(parsed).To(BeNil()) + }) + }) +}) + +var _ = Describe("Translator", func() { + var context *QueryContext + + BeforeEach(func() { + context = &QueryContext{ + DefaultGroup: "default", + DefaultResourceName: "test_resource", + DefaultResourceType: ResourceTypeStream, + CurrentTime: time.Date(2023, 1, 1, 12, 0, 0, 0, time.UTC), + } + }) + + DescribeTable("translates queries correctly", + func(input string, validateFunc func(map[string]any) bool) { + parsed, errors := ParseQuery(input) + Expect(errors).To(BeEmpty()) + Expect(parsed).NotTo(BeNil()) + + translator := NewTranslator(context) + data, err := translator.TranslateToMap(parsed) + Expect(err).NotTo(HaveOccurred()) + Expect(validateFunc(data)).To(BeTrue(), "validation failed for data: %+v", data) + }, + Entry("simple stream query", + "SELECT * FROM STREAM sw", + func(data map[string]any) bool { + return data["Name"] == "sw" + }), + Entry("measure query with aggregation", + "SELECT region, SUM(latency) FROM MEASURE metrics GROUP BY region", + func(data map[string]any) bool { + agg, ok := data["Agg"].(map[string]any) + return ok && agg["Function"] == "SUM" && agg["FieldName"] == "latency" + }), + Entry("time range query", + "SELECT * FROM STREAM sw TIME BETWEEN '2023-01-01T00:00:00Z' AND '2023-01-02T00:00:00Z'", + func(data map[string]any) bool { + timeRange, ok := data["TimeRange"].(map[string]any) + return ok && timeRange["Begin"] == "2023-01-01T00:00:00Z" && + timeRange["End"] == "2023-01-02T00:00:00Z" + }), + Entry("relative time query", + "SELECT * FROM STREAM sw TIME > '-30m'", + func(data map[string]any) bool { + timeRange, ok := data["TimeRange"].(map[string]any) + return ok && timeRange["Begin"] != nil && timeRange["End"] != nil + }), + Entry("WHERE clause with criteria", + "SELECT * FROM STREAM sw WHERE service_id = 'webapp'", + func(data map[string]any) bool { + criteria, ok := data["Criteria"].([]any) + if !ok || len(criteria) == 0 { + return false + } + first, ok := criteria[0].(map[string]any) + return ok && first["tagName"] == "service_id" && first["op"] == "BINARY_OP_EQ" + }), + Entry("TOP N query", + "SHOW TOP 10 FROM MEASURE service_latency ORDER BY value DESC", + func(data map[string]any) bool { + topN := data["TopN"] + fieldValueSort := data["FieldValueSort"] + return topN == float64(10) && fieldValueSort == "DESC" + }), + Entry("property query with IDs", + "SELECT * FROM PROPERTY metadata WHERE ID = 'id1' OR ID = 'id2'", + func(data map[string]any) bool { + criteria, ok := data["Criteria"].([]any) + return ok && len(criteria) == 2 + }), + Entry("query trace enabled", + "SELECT * FROM STREAM sw WITH QUERY_TRACE", + func(data map[string]any) bool { + return data["Trace"] == true + }), + ) +}) + +var _ = Describe("Complex Queries", func() { + var context *QueryContext + + BeforeEach(func() { + context = &QueryContext{ + DefaultGroup: "default", + CurrentTime: time.Date(2023, 1, 1, 12, 0, 0, 0, time.UTC), + } + }) + + complexQueries := []string{ + // Stream queries + `SELECT trace_id, service_id, start_time + FROM STREAM sw IN (default, updated) + WHERE service_id = 'webapp' AND state = 1 + ORDER BY start_time DESC + LIMIT 100`, + + // Measure queries with complex projections + `SELECT region, SUM(latency) + FROM MEASURE service_cpm IN (us-west, us-east) + TIME BETWEEN '-2h' AND 'now' + WHERE service = 'auth-service' + GROUP BY region`, + + // Trace queries with empty projection + `SELECT () + FROM TRACE sw_trace + TIME > '-1h' + WHERE status = 'error' + WITH QUERY_TRACE + LIMIT 50`, + + // Property queries + `SELECT ip, region, owner + FROM PROPERTY server_metadata IN (datacenter-1, datacenter-2) + WHERE datacenter = 'dc-101' + LIMIT 50`, + + // Top-N queries + `SHOW TOP 5 + FROM MEASURE service_errors IN (production, staging) + TIME BETWEEN '-24h' AND 'now' + WHERE status_code = '500' + ORDER BY value DESC`, + } + + for i, query := range complexQueries { + It(fmt.Sprintf("parses and translates complex query %d", i+1), func() { + // Parse the query + parsed, errors := ParseQuery(query) + Expect(errors).To(BeEmpty()) + Expect(parsed).NotTo(BeNil()) + + // Translate to YAML + translator := NewTranslator(context) + yamlData, err := translator.TranslateToYAML(parsed) + Expect(err).NotTo(HaveOccurred()) + Expect(yamlData).NotTo(BeEmpty()) + + // Validate YAML structure by unmarshaling + var result map[string]any + err = yaml.Unmarshal(yamlData, &result) + Expect(err).NotTo(HaveOccurred()) + }) + } +}) + +var _ = Describe("Error Handling", func() { + invalidQueries := []string{ + "SELECT", // incomplete query + "SELECT * FROM", // missing resource + "SELECT * FROM INVALID sw", // invalid resource type + "SHOW TOP FROM MEASURE metrics", // missing N + "SELECT * WHERE service_id", // incomplete condition + "TIME > '2023-01-01'", // missing SELECT + "SELECT * FROM STREAM sw GROUP BY", // incomplete GROUP BY + } + + for i, query := range invalidQueries { + It(fmt.Sprintf("handles invalid query %d", i+1), func() { + parsed, errors := ParseQuery(query) + + // Should have parsing errors + Expect(errors).NotTo(BeEmpty(), "expected parsing errors for query: %s", query) + + // Parsed query should be nil for invalid queries + Expect(parsed).To(BeNil(), "expected nil parsed query for invalid query: %s", query) + }) + } +}) + +var _ = Describe("Time Format Parsing", func() { + var context *QueryContext + + BeforeEach(func() { + context = &QueryContext{ + CurrentTime: time.Date(2023, 1, 1, 12, 0, 0, 0, time.UTC), + } + }) + + DescribeTable("parses time formats correctly in queries", + func(timeCondition string, validateFunc func(map[string]any) bool) { + query := fmt.Sprintf("SELECT * FROM STREAM sw TIME %s", timeCondition) + parsed, errors := ParseQuery(query) + Expect(errors).To(BeEmpty()) + Expect(parsed).NotTo(BeNil()) + + translator := NewTranslator(context) + data, err := translator.TranslateToMap(parsed) + Expect(err).NotTo(HaveOccurred()) + Expect(validateFunc(data)).To(BeTrue(), "time validation failed for data: %+v", data) + }, + Entry("absolute time range", + "BETWEEN '2023-01-01T10:00:00Z' AND '2023-01-01T11:00:00Z'", + func(data map[string]any) bool { + timeRange, ok := data["TimeRange"].(map[string]any) + return ok && timeRange["Begin"] == "2023-01-01T10:00:00Z" && + timeRange["End"] == "2023-01-01T11:00:00Z" + }), + Entry("relative time condition", + "> '-30m'", + func(data map[string]any) bool { + timeRange, ok := data["TimeRange"].(map[string]any) + return ok && timeRange["Begin"] != nil && timeRange["End"] != nil + }), + ) +}) + +var _ = Describe("Case Insensitivity", func() { + // Test that keywords are case-insensitive + queries := []string{ + "SELECT * FROM STREAM sw", + "select * from stream sw", + "Select * From Stream sw", + "sElEcT * fRoM sTrEaM sw", + } + + for i, query := range queries { + It(fmt.Sprintf("parses case-insensitive query %d", i+1), func() { + parsed, errors := ParseQuery(query) + Expect(errors).To(BeEmpty(), "parsing errors for query '%s': %v", query, errors) + Expect(parsed).NotTo(BeNil(), "failed to parse case-insensitive query: %s", query) + }) + } +}) + +// Benchmark tests +func BenchmarkLexer(b *testing.B) { + query := `SELECT trace_id, service_id, start_time + FROM STREAM sw IN (default, updated) + WHERE service_id = 'webapp' AND state = 1 + ORDER BY start_time DESC + LIMIT 100` + + for i := 0; i < b.N; i++ { + lexer := NewLexer(query) + _ = lexer.GetAllTokens() + } +} + +func BenchmarkParser(b *testing.B) { + query := `SELECT trace_id, service_id, start_time + FROM STREAM sw IN (default, updated) + WHERE service_id = 'webapp' AND state = 1 + ORDER BY start_time DESC + LIMIT 100` + + for i := 0; i < b.N; i++ { + _, _ = ParseQuery(query) + } +} + +func BenchmarkTranslator(b *testing.B) { + query := `SELECT trace_id, service_id, start_time + FROM STREAM sw IN (default, updated) + WHERE service_id = 'webapp' AND state = 1 + ORDER BY start_time DESC + LIMIT 100` + + context := &QueryContext{ + CurrentTime: time.Now(), + } + + // Parse once + parsed, _ := ParseQuery(query) + translator := NewTranslator(context) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, _ = translator.TranslateToYAML(parsed) + } +} + +func BenchmarkEndToEnd(b *testing.B) { + query := `SELECT trace_id, service_id, start_time + FROM STREAM sw IN (default, updated) + WHERE service_id = 'webapp' AND state = 1 + ORDER BY start_time DESC + LIMIT 100` + + context := &QueryContext{ + CurrentTime: time.Now(), + } + + for i := 0; i < b.N; i++ { + _, _, _ = TranslateQuery(query, context) + } +} diff --git a/pkg/bydbql/lexer.go b/pkg/bydbql/lexer.go new file mode 100644 index 00000000..a3acee3f --- /dev/null +++ b/pkg/bydbql/lexer.go @@ -0,0 +1,608 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package bydbql + +import ( + "fmt" + "strconv" + "strings" + "unicode" + "unicode/utf8" +) + +// TokenType represents the type of a token +type TokenType int + +const ( + // Special tokens + TokenEOF TokenType = iota + TokenIllegal + + // Literals + TokenIdentifier + TokenString + TokenInteger + + // Keywords + TokenSelect + TokenFrom + TokenWhere + TokenOrderBy + TokenGroupBy + TokenLimit + TokenOffset + TokenTime + TokenBetween + TokenAnd + TokenOr + TokenIn + TokenNotIn + TokenHaving + TokenNotHaving + TokenMatch + TokenShow + TokenTop + TokenStream + TokenMeasure + TokenTrace + TokenProperty + TokenAsc + TokenDesc + TokenWith + TokenQueryTrace + TokenNull + TokenAggregateBy + + // Aggregate functions + TokenSum + TokenMean + TokenAvg + TokenCount + TokenMax + TokenMin + + // Operators + TokenEqual + TokenNotEqual + TokenGreater + TokenLess + TokenGreaterEqual + TokenLessEqual + + // Punctuation + TokenComma + TokenSemicolon + TokenLeftParen + TokenRightParen + TokenDot + TokenDoubleColon + + // Special literals + TokenStar + TokenBy + TokenValue +) + +// Token represents a lexical token +type Token struct { + Type TokenType + Literal string + Line int + Column int + Position int +} + +func (t Token) String() string { + return fmt.Sprintf("Token{Type: %s, Literal: %s, Line: %d, Column: %d}", + t.Type.String(), t.Literal, t.Line, t.Column) +} + +func (tt TokenType) String() string { + switch tt { + case TokenEOF: + return "EOF" + case TokenIllegal: + return "ILLEGAL" + case TokenIdentifier: + return "IDENTIFIER" + case TokenString: + return "STRING" + case TokenInteger: + return "INTEGER" + case TokenSelect: + return "SELECT" + case TokenFrom: + return "FROM" + case TokenWhere: + return "WHERE" + case TokenOrderBy: + return "ORDER BY" + case TokenGroupBy: + return "GROUP BY" + case TokenLimit: + return "LIMIT" + case TokenOffset: + return "OFFSET" + case TokenTime: + return "TIME" + case TokenBetween: + return "BETWEEN" + case TokenAnd: + return "AND" + case TokenOr: + return "OR" + case TokenIn: + return "IN" + case TokenNotIn: + return "NOT IN" + case TokenHaving: + return "HAVING" + case TokenNotHaving: + return "NOT HAVING" + case TokenMatch: + return "MATCH" + case TokenShow: + return "SHOW" + case TokenTop: + return "TOP" + case TokenStream: + return "STREAM" + case TokenMeasure: + return "MEASURE" + case TokenTrace: + return "TRACE" + case TokenProperty: + return "PROPERTY" + case TokenAsc: + return "ASC" + case TokenDesc: + return "DESC" + case TokenWith: + return "WITH" + case TokenQueryTrace: + return "QUERY_TRACE" + case TokenNull: + return "NULL" + case TokenAggregateBy: + return "AGGREGATE BY" + case TokenSum: + return "SUM" + case TokenMean: + return "MEAN" + case TokenAvg: + return "AVG" + case TokenCount: + return "COUNT" + case TokenMax: + return "MAX" + case TokenMin: + return "MIN" + case TokenEqual: + return "=" + case TokenNotEqual: + return "!=" + case TokenGreater: + return ">" + case TokenLess: + return "<" + case TokenGreaterEqual: + return ">=" + case TokenLessEqual: + return "<=" + case TokenComma: + return "," + case TokenSemicolon: + return ";" + case TokenLeftParen: + return "(" + case TokenRightParen: + return ")" + case TokenDot: + return "." + case TokenDoubleColon: + return "::" + case TokenStar: + return "*" + case TokenBy: + return "BY" + case TokenValue: + return "VALUE" + default: + return "UNKNOWN" + } +} + +// keywords maps keyword strings to their token types +var keywords = map[string]TokenType{ + "select": TokenSelect, + "from": TokenFrom, + "where": TokenWhere, + "order": TokenOrderBy, // handled specially as "order by" + "group": TokenGroupBy, // handled specially as "group by" + "limit": TokenLimit, + "offset": TokenOffset, + "time": TokenTime, + "between": TokenBetween, + "and": TokenAnd, + "or": TokenOr, + "in": TokenIn, + "not": TokenNotIn, // handled specially + "having": TokenHaving, + "match": TokenMatch, + "show": TokenShow, + "top": TokenTop, + "stream": TokenStream, + "measure": TokenMeasure, + "trace": TokenTrace, + "property": TokenProperty, + "asc": TokenAsc, + "desc": TokenDesc, + "with": TokenWith, + "query_trace": TokenQueryTrace, + "null": TokenNull, + "aggregate": TokenAggregateBy, // handled specially as "aggregate by" + "sum": TokenSum, + "mean": TokenMean, + "avg": TokenAvg, + "count": TokenCount, + "max": TokenMax, + "min": TokenMin, + "by": TokenBy, + "value": TokenValue, +} + +// Lexer represents a lexical analyzer +type Lexer struct { + input string + position int + readPosition int + ch byte + line int + column int +} + +// NewLexer creates a new lexer instance +func NewLexer(input string) *Lexer { + l := &Lexer{ + input: input, + line: 1, + column: 0, + } + l.readChar() + return l +} + +// readChar reads the next character and advances position +func (l *Lexer) readChar() { + if l.readPosition >= len(l.input) { + l.ch = 0 // EOF + } else { + l.ch = l.input[l.readPosition] + } + l.position = l.readPosition + l.readPosition++ + + if l.ch == '\n' { + l.line++ + l.column = 0 + } else { + l.column++ + } +} + +// peekChar returns the next character without advancing position +func (l *Lexer) peekChar() byte { + if l.readPosition >= len(l.input) { + return 0 + } + return l.input[l.readPosition] +} + +// skipWhitespace skips whitespace characters +func (l *Lexer) skipWhitespace() { + for l.ch == ' ' || l.ch == '\t' || l.ch == '\n' || l.ch == '\r' { + l.readChar() + } +} + +// readIdentifier reads an identifier or keyword +func (l *Lexer) readIdentifier() string { + position := l.position + for isLetter(l.ch) || isDigit(l.ch) || l.ch == '_' || l.ch == '-' { + l.readChar() + } + return l.input[position:l.position] +} + +// readNumber reads a numeric literal +func (l *Lexer) readNumber() string { + position := l.position + for isDigit(l.ch) { + l.readChar() + } + return l.input[position:l.position] +} + +// readString reads a string literal +func (l *Lexer) readString(delimiter byte) (string, error) { + position := l.position + 1 // skip opening quote + for { + l.readChar() + if l.ch == delimiter || l.ch == 0 { + break + } + if l.ch == '\\' && l.peekChar() == delimiter { + l.readChar() // skip escaped quote + } + } + + if l.ch == 0 { + return "", fmt.Errorf("unterminated string literal at line %d, column %d", l.line, l.column) + } + + str := l.input[position:l.position] + l.readChar() // skip closing quote + return str, nil +} + +// lookupIdent checks if an identifier is a keyword +func (l *Lexer) lookupIdent(ident string) TokenType { + lower := strings.ToLower(ident) + if tok, ok := keywords[lower]; ok { + // Handle compound keywords + switch lower { + case "order": + if l.peekAhead("by") { + l.consumeKeyword("by") + return TokenOrderBy + } + case "group": + if l.peekAhead("by") { + l.consumeKeyword("by") + return TokenGroupBy + } + case "aggregate": + if l.peekAhead("by") { + l.consumeKeyword("by") + return TokenAggregateBy + } + case "not": + if l.peekAhead("in") { + l.consumeKeyword("in") + return TokenNotIn + } else if l.peekAhead("having") { + l.consumeKeyword("having") + return TokenNotHaving + } + } + return tok + } + return TokenIdentifier +} + +// peekAhead checks if the next identifier matches the expected keyword +func (l *Lexer) peekAhead(expected string) bool { + savedPos := l.position + savedReadPos := l.readPosition + savedCh := l.ch + savedLine := l.line + savedColumn := l.column + + l.skipWhitespace() + + if !isLetter(l.ch) { + // Restore lexer state + l.position = savedPos + l.readPosition = savedReadPos + l.ch = savedCh + l.line = savedLine + l.column = savedColumn + return false + } + + ident := l.readIdentifier() + match := strings.EqualFold(ident, expected) + + if !match { + // Restore lexer state + l.position = savedPos + l.readPosition = savedReadPos + l.ch = savedCh + l.line = savedLine + l.column = savedColumn + } else { + // When there's a match, we need to restore the lexer state + // because consumeKeyword will consume the keyword again + l.position = savedPos + l.readPosition = savedReadPos + l.ch = savedCh + l.line = savedLine + l.column = savedColumn + } + + return match +} + +// consumeKeyword consumes the expected keyword +func (l *Lexer) consumeKeyword(expected string) { + l.skipWhitespace() + l.readIdentifier() // consume the keyword +} + +// NextToken returns the next token +func (l *Lexer) NextToken() Token { + var tok Token + tok.Line = l.line + tok.Column = l.column + tok.Position = l.position + + l.skipWhitespace() + + switch l.ch { + case '=': + tok.Type = TokenEqual + tok.Literal = "=" + case '!': + if l.peekChar() == '=' { + ch := l.ch + l.readChar() + tok.Type = TokenNotEqual + tok.Literal = string(ch) + string(l.ch) + } else { + tok.Type = TokenIllegal + tok.Literal = string(l.ch) + } + case '>': + if l.peekChar() == '=' { + ch := l.ch + l.readChar() + tok.Type = TokenGreaterEqual + tok.Literal = string(ch) + string(l.ch) + } else { + tok.Type = TokenGreater + tok.Literal = ">" + } + case '<': + if l.peekChar() == '=' { + ch := l.ch + l.readChar() + tok.Type = TokenLessEqual + tok.Literal = string(ch) + string(l.ch) + } else { + tok.Type = TokenLess + tok.Literal = "<" + } + case ',': + tok.Type = TokenComma + tok.Literal = "," + case ';': + tok.Type = TokenSemicolon + tok.Literal = ";" + case '(': + tok.Type = TokenLeftParen + tok.Literal = "(" + case ')': + tok.Type = TokenRightParen + tok.Literal = ")" + case '.': + tok.Type = TokenDot + tok.Literal = "." + case ':': + if l.peekChar() == ':' { + l.readChar() + tok.Type = TokenDoubleColon + tok.Literal = "::" + } else { + tok.Type = TokenIllegal + tok.Literal = ":" + } + case '*': + tok.Type = TokenStar + tok.Literal = "*" + case '\'', '"': + str, err := l.readString(l.ch) + if err != nil { + tok.Type = TokenIllegal + tok.Literal = err.Error() + } else { + tok.Type = TokenString + tok.Literal = str + } + case 0: + tok.Type = TokenEOF + tok.Literal = "" + default: + if isLetter(l.ch) { + tok.Literal = l.readIdentifier() + tok.Type = l.lookupIdent(tok.Literal) + return tok // return early to avoid readChar() + } else if isDigit(l.ch) { + tok.Type = TokenInteger + tok.Literal = l.readNumber() + return tok // return early to avoid readChar() + } else { + tok.Type = TokenIllegal + tok.Literal = string(l.ch) + } + } + + l.readChar() + return tok +} + +// GetAllTokens returns all tokens from the input +func (l *Lexer) GetAllTokens() []Token { + var tokens []Token + for { + tok := l.NextToken() + tokens = append(tokens, tok) + if tok.Type == TokenEOF { + break + } + } + return tokens +} + +// isLetter checks if the character is a letter +func isLetter(ch byte) bool { + return 'a' <= ch && ch <= 'z' || 'A' <= ch && ch <= 'Z' || ch == '_' +} + +// isDigit checks if the character is a digit +func isDigit(ch byte) bool { + return '0' <= ch && ch <= '9' +} + +// isAlphaNumeric checks if the character is alphanumeric +func isAlphaNumeric(ch byte) bool { + return isLetter(ch) || isDigit(ch) +} + +// ParseInteger parses a token literal as integer +func ParseInteger(literal string) (int64, error) { + return strconv.ParseInt(literal, 10, 64) +} + +// IsUnicodeIdentifier checks if a rune can be part of a Unicode identifier +func IsUnicodeIdentifier(r rune) bool { + return unicode.IsLetter(r) || unicode.IsDigit(r) || r == '_' +} + +// IsUnicodeIdentifierStart checks if a rune can start a Unicode identifier +func IsUnicodeIdentifierStart(r rune) bool { + return unicode.IsLetter(r) || r == '_' +} + +// ReadUnicodeIdentifier reads a Unicode identifier +func (l *Lexer) ReadUnicodeIdentifier() string { + start := l.position + for l.position < len(l.input) { + r, size := utf8.DecodeRuneInString(l.input[l.position:]) + if r == utf8.RuneError || !IsUnicodeIdentifier(r) { + break + } + l.position += size + l.readPosition = l.position + 1 + if l.position < len(l.input) { + l.ch = l.input[l.position] + } else { + l.ch = 0 + } + } + return l.input[start:l.position] +} \ No newline at end of file diff --git a/pkg/bydbql/parser.go b/pkg/bydbql/parser.go new file mode 100644 index 00000000..07b3610a --- /dev/null +++ b/pkg/bydbql/parser.go @@ -0,0 +1,796 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package bydbql + +import ( + "fmt" + "strconv" + "strings" + "time" +) + +// Parser represents a BydbQL parser +type Parser struct { + lexer *Lexer + errors []string + + currentToken Token + peekToken Token +} + +// NewParser creates a new parser instance +func NewParser(lexer *Lexer) *Parser { + p := &Parser{ + lexer: lexer, + errors: []string{}, + } + + // Read two tokens, so currentToken and peekToken are both set + p.nextToken() + p.nextToken() + + return p +} + +// nextToken advances to the next token +func (p *Parser) nextToken() { + p.currentToken = p.peekToken + p.peekToken = p.lexer.NextToken() +} + +// addError adds a parsing error +func (p *Parser) addError(msg string, args ...interface{}) { + p.errors = append(p.errors, fmt.Sprintf(msg, args...)) +} + +// Errors returns parsing errors +func (p *Parser) Errors() []string { + return p.errors +} + +// expectToken checks if the current token matches expected type and advances +func (p *Parser) expectToken(tokenType TokenType) bool { + if p.currentTokenIs(tokenType) { + p.nextToken() + return true + } + + p.addError("expected token %s, got %s at line %d, column %d", + tokenType.String(), p.currentToken.Type.String(), + p.currentToken.Line, p.currentToken.Column) + return false +} + +// currentTokenIs checks if current token matches the given type +func (p *Parser) currentTokenIs(tokenType TokenType) bool { + return p.currentToken.Type == tokenType +} + +// peekTokenIs checks if peek token matches the given type +func (p *Parser) peekTokenIs(tokenType TokenType) bool { + return p.peekToken.Type == tokenType +} + +// Parse parses the BydbQL query and returns a parsed query +func (p *Parser) Parse() *ParsedQuery { + var stmt Statement + + switch p.currentToken.Type { + case TokenSelect: + stmt = p.parseSelectStatement() + case TokenShow: + stmt = p.parseTopNStatement() + default: + p.addError("unexpected token %s at line %d, column %d. Expected SELECT or SHOW", + p.currentToken.Type.String(), p.currentToken.Line, p.currentToken.Column) + return nil + } + + if len(p.errors) > 0 { + return nil + } + + // Extract resource information from the statement + query := &ParsedQuery{ + Statement: stmt, + Context: &QueryContext{CurrentTime: time.Now()}, + } + + switch s := stmt.(type) { + case *SelectStatement: + if s.From != nil { + query.ResourceType = s.From.ResourceType + query.ResourceName = s.From.ResourceName + query.Groups = s.From.Groups + } + case *TopNStatement: + if s.From != nil { + query.ResourceType = s.From.ResourceType + query.ResourceName = s.From.ResourceName + query.Groups = s.From.Groups + } + } + + return query +} + +// parseSelectStatement parses a SELECT statement +func (p *Parser) parseSelectStatement() *SelectStatement { + stmt := &SelectStatement{} + + if !p.expectToken(TokenSelect) { + return nil + } + + // Parse projection + stmt.Projection = p.parseProjection() + if stmt.Projection == nil { + return nil + } + + // Parse optional FROM clause + if p.currentTokenIs(TokenFrom) { + stmt.From = p.parseFromClause() + } + + // Parse optional TIME clause + if p.currentTokenIs(TokenTime) { + stmt.Time = p.parseTimeCondition() + } + + // Parse optional WHERE clause + if p.currentTokenIs(TokenWhere) { + stmt.Where = p.parseWhereClause() + } + + // Parse optional GROUP BY clause + if p.currentTokenIs(TokenGroupBy) { + stmt.GroupBy = p.parseGroupByClause() + } + + // Parse optional ORDER BY clause + if p.currentTokenIs(TokenOrderBy) { + stmt.OrderBy = p.parseOrderByClause() + } + + // Parse optional LIMIT clause + if p.currentTokenIs(TokenLimit) { + p.nextToken() + if p.currentTokenIs(TokenInteger) { + if limit, err := strconv.Atoi(p.currentToken.Literal); err == nil { + stmt.Limit = &limit + } else { + p.addError("invalid LIMIT value: %s", p.currentToken.Literal) + } + p.nextToken() + } else { + p.addError("expected integer after LIMIT") + } + } + + // Parse optional OFFSET clause + if p.currentTokenIs(TokenOffset) { + p.nextToken() + if p.currentTokenIs(TokenInteger) { + if offset, err := strconv.Atoi(p.currentToken.Literal); err == nil { + stmt.Offset = &offset + } else { + p.addError("invalid OFFSET value: %s", p.currentToken.Literal) + } + p.nextToken() + } else { + p.addError("expected integer after OFFSET") + } + } + + // Parse optional WITH QUERY_TRACE clause + if p.currentTokenIs(TokenWith) { + p.nextToken() + if p.currentTokenIs(TokenQueryTrace) { + stmt.QueryTrace = true + p.nextToken() + } else { + p.addError("expected QUERY_TRACE after WITH") + } + } + + return stmt +} + +// parseTopNStatement parses a SHOW TOP N statement +func (p *Parser) parseTopNStatement() *TopNStatement { + stmt := &TopNStatement{} + + if !p.expectToken(TokenShow) { + return nil + } + + if !p.expectToken(TokenTop) { + return nil + } + + // Parse N + if p.currentTokenIs(TokenInteger) { + if n, err := strconv.Atoi(p.currentToken.Literal); err == nil { + stmt.TopN = n + } else { + p.addError("invalid TOP N value: %s", p.currentToken.Literal) + return nil + } + p.nextToken() + } else { + p.addError("expected integer after TOP") + return nil + } + + // Parse FROM clause (mandatory for Top-N) + if p.currentTokenIs(TokenFrom) { + stmt.From = p.parseFromClause() + if stmt.From == nil { + return nil + } + } else { + p.addError("FROM clause is mandatory for TOP N queries") + return nil + } + + // Parse optional TIME clause + if p.currentTokenIs(TokenTime) { + stmt.Time = p.parseTimeCondition() + } + + // Parse optional WHERE clause + if p.currentTokenIs(TokenWhere) { + stmt.Where = p.parseWhereClause() + } + + // Parse optional AGGREGATE BY clause + if p.currentTokenIs(TokenAggregateBy) { + p.nextToken() + stmt.AggregateBy = p.parseAggregateFunction() + } + + // Parse optional ORDER BY clause + if p.currentTokenIs(TokenOrderBy) { + stmt.OrderBy = p.parseOrderByClause() + } + + // Parse optional WITH QUERY_TRACE clause + if p.currentTokenIs(TokenWith) { + p.nextToken() + if p.currentTokenIs(TokenQueryTrace) { + stmt.QueryTrace = true + p.nextToken() + } else { + p.addError("expected QUERY_TRACE after WITH") + } + } + + return stmt +} + +// parseProjection parses the SELECT projection +func (p *Parser) parseProjection() *Projection { + projection := &Projection{} + + if p.currentTokenIs(TokenStar) { + projection.All = true + p.nextToken() + return projection + } + + // Check for empty projection SELECT () + if p.currentTokenIs(TokenLeftParen) { + p.nextToken() + if p.currentTokenIs(TokenRightParen) { + projection.Empty = true + p.nextToken() + return projection + } else { + p.addError("expected ) after (") + return nil + } + } + + // Check for TOP N projection + if p.currentTokenIs(TokenTop) { + p.nextToken() + if p.currentTokenIs(TokenInteger) { + if n, err := strconv.Atoi(p.currentToken.Literal); err == nil { + projection.TopN = &TopNProjection{N: n} + p.nextToken() + + // Parse the rest of the projection after TOP N + if p.currentTokenIs(TokenStar) { + projection.TopN.Projection = &Projection{All: true} + p.nextToken() + } else { + // Parse column list + projection.TopN.Projection = &Projection{} + projection.TopN.Projection.Columns = p.parseColumnList() + } + return projection + } else { + p.addError("invalid TOP N value: %s", p.currentToken.Literal) + return nil + } + } else { + p.addError("expected integer after TOP") + return nil + } + } + + // Parse column list + projection.Columns = p.parseColumnList() + if projection.Columns == nil { + return nil + } + + return projection +} + +// parseColumnList parses a comma-separated list of columns +func (p *Parser) parseColumnList() []*Column { + var columns []*Column + + for { + col := p.parseColumn() + if col == nil { + return nil + } + columns = append(columns, col) + + if !p.currentTokenIs(TokenComma) { + break + } + p.nextToken() // consume comma + } + + return columns +} + +// parseColumn parses a single column +func (p *Parser) parseColumn() *Column { + col := &Column{} + + // Check for aggregate functions + if p.isAggregateFunction(p.currentToken.Type) { + col.Function = p.parseAggregateFunction() + return col + } + + // Parse column name + if p.currentTokenIs(TokenIdentifier) { + col.Name = p.currentToken.Literal + p.nextToken() + + // Check for type disambiguator ::tag or ::field + if p.currentTokenIs(TokenDoubleColon) { + p.nextToken() + if p.currentTokenIs(TokenIdentifier) { + switch strings.ToLower(p.currentToken.Literal) { + case "tag": + col.Type = ColumnTypeTag + case "field": + col.Type = ColumnTypeField + default: + p.addError("invalid column type: %s. Expected 'tag' or 'field'", p.currentToken.Literal) + return nil + } + p.nextToken() + } else { + p.addError("expected 'tag' or 'field' after ::") + return nil + } + } + } else { + p.addError("expected column name") + return nil + } + + return col +} + +// parseAggregateFunction parses an aggregate function +func (p *Parser) parseAggregateFunction() *AggregateFunction { + fn := &AggregateFunction{} + + if p.isAggregateFunction(p.currentToken.Type) { + fn.Function = strings.ToUpper(p.currentToken.Literal) + p.nextToken() + + if !p.expectToken(TokenLeftParen) { + return nil + } + + if p.currentTokenIs(TokenIdentifier) { + fn.Column = p.currentToken.Literal + p.nextToken() + } else { + p.addError("expected column name in aggregate function") + return nil + } + + if !p.expectToken(TokenRightParen) { + return nil + } + } + + return fn +} + +// isAggregateFunction checks if token type is an aggregate function +func (p *Parser) isAggregateFunction(tokenType TokenType) bool { + return tokenType == TokenSum || tokenType == TokenMean || tokenType == TokenAvg || + tokenType == TokenCount || tokenType == TokenMax || tokenType == TokenMin +} + +// parseFromClause parses the FROM clause +func (p *Parser) parseFromClause() *FromClause { + if !p.expectToken(TokenFrom) { + return nil + } + + from := &FromClause{} + + // Parse resource type + switch p.currentToken.Type { + case TokenStream: + from.ResourceType = ResourceTypeStream + p.nextToken() + case TokenMeasure: + from.ResourceType = ResourceTypeMeasure + p.nextToken() + case TokenTrace: + from.ResourceType = ResourceTypeTrace + p.nextToken() + case TokenProperty: + from.ResourceType = ResourceTypeProperty + p.nextToken() + case TokenIdentifier: + // Only allow specific resource types, reject others + switch strings.ToUpper(p.currentToken.Literal) { + case "STREAM": + from.ResourceType = ResourceTypeStream + case "MEASURE": + from.ResourceType = ResourceTypeMeasure + case "TRACE": + from.ResourceType = ResourceTypeTrace + case "PROPERTY": + from.ResourceType = ResourceTypeProperty + default: + p.addError("expected resource type (STREAM, MEASURE, TRACE, PROPERTY), got: %s", p.currentToken.Literal) + return nil + } + p.nextToken() + default: + p.addError("expected resource type (STREAM, MEASURE, TRACE, PROPERTY) or resource name") + return nil + } + + // Parse resource name + if p.currentTokenIs(TokenIdentifier) { + from.ResourceName = p.currentToken.Literal + p.nextToken() + } else { + p.addError("expected resource name") + return nil + } + + // Parse optional IN (groups) clause + if p.currentTokenIs(TokenIn) { + p.nextToken() + if !p.expectToken(TokenLeftParen) { + return nil + } + + // Parse group list + for { + if p.currentTokenIs(TokenIdentifier) { + from.Groups = append(from.Groups, p.currentToken.Literal) + p.nextToken() + } else { + p.addError("expected group name") + return nil + } + + if !p.currentTokenIs(TokenComma) { + break + } + p.nextToken() // consume comma + } + + if !p.expectToken(TokenRightParen) { + return nil + } + } + + return from +} + +// parseTimeCondition parses TIME conditions +func (p *Parser) parseTimeCondition() *TimeCondition { + if !p.expectToken(TokenTime) { + return nil + } + + condition := &TimeCondition{} + + // Parse operator + switch p.currentToken.Type { + case TokenEqual: + condition.Operator = TimeOpEqual + p.nextToken() + condition.Timestamp = p.parseTimestamp() + case TokenGreater: + condition.Operator = TimeOpGreater + p.nextToken() + condition.Timestamp = p.parseTimestamp() + case TokenLess: + condition.Operator = TimeOpLess + p.nextToken() + condition.Timestamp = p.parseTimestamp() + case TokenGreaterEqual: + condition.Operator = TimeOpGreaterEqual + p.nextToken() + condition.Timestamp = p.parseTimestamp() + case TokenLessEqual: + condition.Operator = TimeOpLessEqual + p.nextToken() + condition.Timestamp = p.parseTimestamp() + case TokenBetween: + condition.Operator = TimeOpBetween + p.nextToken() + condition.Begin = p.parseTimestamp() + if !p.expectToken(TokenAnd) { + return nil + } + condition.End = p.parseTimestamp() + default: + p.addError("expected time operator (=, >, <, >=, <=, BETWEEN)") + return nil + } + + return condition +} + +// parseTimestamp parses a timestamp (string or integer) +func (p *Parser) parseTimestamp() string { + if p.currentTokenIs(TokenString) || p.currentTokenIs(TokenInteger) { + timestamp := p.currentToken.Literal + p.nextToken() + return timestamp + } + + p.addError("expected timestamp") + return "" +} + +// parseWhereClause parses WHERE clause +func (p *Parser) parseWhereClause() *WhereClause { + if !p.expectToken(TokenWhere) { + return nil + } + + where := &WhereClause{} + where.Conditions = p.parseConditions() + + return where +} + +// parseConditions parses conditions with AND/OR logic +func (p *Parser) parseConditions() []*Condition { + var conditions []*Condition + + condition := p.parseCondition() + if condition == nil { + return nil + } + conditions = append(conditions, condition) + + for p.currentTokenIs(TokenAnd) || p.currentTokenIs(TokenOr) { + if p.currentTokenIs(TokenAnd) { + condition.Logic = LogicAnd + } else { + condition.Logic = LogicOr + } + p.nextToken() + + condition = p.parseCondition() + if condition == nil { + return nil + } + conditions = append(conditions, condition) + } + + return conditions +} + +// parseCondition parses a single condition +func (p *Parser) parseCondition() *Condition { + condition := &Condition{} + + // Parse left side (column name) + if p.currentTokenIs(TokenIdentifier) { + condition.Left = p.currentToken.Literal + p.nextToken() + } else { + p.addError("expected column name in condition") + return nil + } + + // Parse operator + switch p.currentToken.Type { + case TokenEqual: + condition.Operator = OpEqual + case TokenNotEqual: + condition.Operator = OpNotEqual + case TokenGreater: + condition.Operator = OpGreater + case TokenLess: + condition.Operator = OpLess + case TokenGreaterEqual: + condition.Operator = OpGreaterEqual + case TokenLessEqual: + condition.Operator = OpLessEqual + case TokenIn: + condition.Operator = OpIn + case TokenNotIn: + condition.Operator = OpNotIn + case TokenHaving: + condition.Operator = OpHaving + case TokenNotHaving: + condition.Operator = OpNotHaving + case TokenMatch: + condition.Operator = OpMatch + default: + p.addError("expected comparison operator") + return nil + } + p.nextToken() + + // Parse right side + if condition.Operator == OpIn || condition.Operator == OpNotIn { + // Parse value list + if !p.expectToken(TokenLeftParen) { + return nil + } + + for { + value := p.parseValue() + if value == nil { + return nil + } + condition.Values = append(condition.Values, value) + + if !p.currentTokenIs(TokenComma) { + break + } + p.nextToken() // consume comma + } + + if !p.expectToken(TokenRightParen) { + return nil + } + } else { + // Parse single value + condition.Right = p.parseValue() + if condition.Right == nil { + return nil + } + } + + return condition +} + +// parseValue parses a value (string, integer, or null) +func (p *Parser) parseValue() *Value { + value := &Value{} + + switch p.currentToken.Type { + case TokenString: + value.Type = ValueTypeString + value.StringVal = p.currentToken.Literal + p.nextToken() + case TokenInteger: + value.Type = ValueTypeInteger + if i, err := strconv.ParseInt(p.currentToken.Literal, 10, 64); err == nil { + value.Integer = i + } else { + p.addError("invalid integer: %s", p.currentToken.Literal) + return nil + } + p.nextToken() + case TokenNull: + value.Type = ValueTypeNull + value.IsNull = true + p.nextToken() + default: + p.addError("expected value (string, integer, or NULL)") + return nil + } + + return value +} + +// parseGroupByClause parses GROUP BY clause +func (p *Parser) parseGroupByClause() *GroupByClause { + if !p.expectToken(TokenGroupBy) { + return nil + } + + groupBy := &GroupByClause{} + + // Parse column list + for { + if p.currentTokenIs(TokenIdentifier) { + groupBy.Columns = append(groupBy.Columns, p.currentToken.Literal) + p.nextToken() + } else { + p.addError("expected column name in GROUP BY") + return nil + } + + if !p.currentTokenIs(TokenComma) { + break + } + p.nextToken() // consume comma + } + + return groupBy +} + +// parseOrderByClause parses ORDER BY clause +func (p *Parser) parseOrderByClause() *OrderByClause { + if !p.expectToken(TokenOrderBy) { + return nil + } + + orderBy := &OrderByClause{} + + // Parse column name (or "value" for Top-N queries) + if p.currentTokenIs(TokenIdentifier) { + orderBy.Column = p.currentToken.Literal + p.nextToken() + } else if p.currentTokenIs(TokenValue) { + orderBy.Column = "value" + p.nextToken() + } else { + p.addError("expected column name in ORDER BY") + return nil + } + + // Parse optional ASC/DESC + if p.currentTokenIs(TokenAsc) { + orderBy.Desc = false + p.nextToken() + } else if p.currentTokenIs(TokenDesc) { + orderBy.Desc = true + p.nextToken() + } + + return orderBy +} + +// ParseQuery is a convenience function to parse a BydbQL query string +func ParseQuery(query string) (*ParsedQuery, []string) { + lexer := NewLexer(query) + parser := NewParser(lexer) + parsed := parser.Parse() + return parsed, parser.Errors() +} \ No newline at end of file diff --git a/pkg/bydbql/translator.go b/pkg/bydbql/translator.go new file mode 100644 index 00000000..ca41b8fd --- /dev/null +++ b/pkg/bydbql/translator.go @@ -0,0 +1,591 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package bydbql + +import ( + "fmt" + "strconv" + "strings" + "time" + + str2duration "github.com/xhit/go-str2duration/v2" + "sigs.k8s.io/yaml" +) + +// QueryYAML represents the YAML structure for BanyanDB queries +type QueryYAML struct { + // Common fields + Name string `yaml:"name,omitempty"` + Groups []string `yaml:"groups,omitempty"` + TimeRange *TimeRangeYAML `yaml:"timeRange,omitempty"` + Criteria []map[string]interface{} `yaml:"criteria,omitempty"` + Limit *int `yaml:"limit,omitempty"` + Offset *int `yaml:"offset,omitempty"` + Trace bool `yaml:"trace,omitempty"` + + // Stream/Trace specific + Projection []string `yaml:"projection,omitempty"` + OrderBy *OrderByYAML `yaml:"orderBy,omitempty"` + + // Measure specific + TagProjection []string `yaml:"tagProjection,omitempty"` + FieldProjection []string `yaml:"fieldProjection,omitempty"` + GroupBy *GroupByYAML `yaml:"groupBy,omitempty"` + Agg *AggregationYAML `yaml:"agg,omitempty"` + Top *TopYAML `yaml:"top,omitempty"` + + // Property specific + IDs []string `yaml:"ids,omitempty"` + + // Top-N specific (for separate endpoint) + TopN int `yaml:"topN,omitempty"` + FieldValueSort string `yaml:"fieldValueSort,omitempty"` + Conditions []map[string]interface{} `yaml:"conditions,omitempty"` +} + +// TimeRangeYAML represents time range in YAML +type TimeRangeYAML struct { + Begin string `yaml:"begin"` + End string `yaml:"end"` +} + +// OrderByYAML represents ORDER BY clause in YAML +type OrderByYAML struct { + IndexRuleName string `yaml:"indexRuleName"` + Sort string `yaml:"sort"` // ASC or DESC +} + +// GroupByYAML represents GROUP BY clause in YAML +type GroupByYAML struct { + TagProjection []string `yaml:"tagProjection"` +} + +// AggregationYAML represents aggregation function in YAML +type AggregationYAML struct { + Function string `yaml:"function"` + FieldName string `yaml:"fieldName"` +} + +// TopYAML represents TOP N clause in YAML +type TopYAML struct { + Number int `yaml:"number"` + FieldName string `yaml:"fieldName"` + FieldValueSort string `yaml:"fieldValueSort"` +} + +// Translator converts parsed BydbQL to YAML format +type Translator struct { + context *QueryContext +} + +// NewTranslator creates a new translator +func NewTranslator(context *QueryContext) *Translator { + if context == nil { + context = &QueryContext{CurrentTime: time.Now()} + } + return &Translator{context: context} +} + +// TranslateToYAML translates a parsed query to YAML format +func (t *Translator) TranslateToYAML(query *ParsedQuery) ([]byte, error) { + yamlQuery, err := t.translateQuery(query) + if err != nil { + return nil, err + } + + return yaml.Marshal(yamlQuery) +} + +// TranslateToMap translates a parsed query to a map suitable for JSON/YAML conversion +func (t *Translator) TranslateToMap(query *ParsedQuery) (map[string]interface{}, error) { + yamlQuery, err := t.translateQuery(query) + if err != nil { + return nil, err + } + + yamlBytes, err := yaml.Marshal(yamlQuery) + if err != nil { + return nil, err + } + + var result map[string]interface{} + err = yaml.Unmarshal(yamlBytes, &result) + return result, err +} + +// translateQuery translates the main query structure +func (t *Translator) translateQuery(query *ParsedQuery) (*QueryYAML, error) { + yamlQuery := &QueryYAML{} + + // Set common fields from context or FROM clause + if query.ResourceName != "" { + yamlQuery.Name = query.ResourceName + } else if t.context.DefaultResourceName != "" { + yamlQuery.Name = t.context.DefaultResourceName + } + + if len(query.Groups) > 0 { + yamlQuery.Groups = query.Groups + } else if t.context.DefaultGroup != "" { + yamlQuery.Groups = []string{t.context.DefaultGroup} + } + + switch stmt := query.Statement.(type) { + case *SelectStatement: + return t.translateSelectStatement(stmt, yamlQuery, query) + case *TopNStatement: + return t.translateTopNStatement(stmt, yamlQuery, query) + default: + return nil, fmt.Errorf("unsupported statement type") + } +} + +// translateSelectStatement translates SELECT statements +func (t *Translator) translateSelectStatement(stmt *SelectStatement, yamlQuery *QueryYAML, query *ParsedQuery) (*QueryYAML, error) { + // Set query trace flag + yamlQuery.Trace = stmt.QueryTrace + + // Translate TIME clause + if stmt.Time != nil { + timeRange, err := t.translateTimeCondition(stmt.Time) + if err != nil { + return nil, err + } + yamlQuery.TimeRange = timeRange + } + + // Translate WHERE clause + if stmt.Where != nil { + criteria, err := t.translateWhereClause(stmt.Where) + if err != nil { + return nil, err + } + yamlQuery.Criteria = criteria + } + + // Set LIMIT and OFFSET + yamlQuery.Limit = stmt.Limit + yamlQuery.Offset = stmt.Offset + + // Translate based on resource type + resourceType := query.ResourceType + if resourceType == ResourceTypeAuto { + resourceType = t.context.DefaultResourceType + } + + switch resourceType { + case ResourceTypeStream, ResourceTypeTrace: + return t.translateStreamOrTraceQuery(stmt, yamlQuery) + case ResourceTypeMeasure: + return t.translateMeasureQuery(stmt, yamlQuery) + case ResourceTypeProperty: + return t.translatePropertyQuery(stmt, yamlQuery) + default: + // Auto-detect or use default + return t.translateStreamOrTraceQuery(stmt, yamlQuery) + } +} + +// translateStreamOrTraceQuery translates stream/trace specific fields +func (t *Translator) translateStreamOrTraceQuery(stmt *SelectStatement, yamlQuery *QueryYAML) (*QueryYAML, error) { + // Translate projection + if stmt.Projection != nil { + if stmt.Projection.All { + // SELECT * - no specific projection needed + } else if stmt.Projection.Empty { + // SELECT () - empty projection for traces + yamlQuery.Projection = []string{} + } else if len(stmt.Projection.Columns) > 0 { + for _, col := range stmt.Projection.Columns { + yamlQuery.Projection = append(yamlQuery.Projection, col.Name) + } + } + } + + // Translate ORDER BY + if stmt.OrderBy != nil { + yamlQuery.OrderBy = &OrderByYAML{ + IndexRuleName: stmt.OrderBy.Column, + } + if stmt.OrderBy.Desc { + yamlQuery.OrderBy.Sort = "DESC" + } else { + yamlQuery.OrderBy.Sort = "ASC" + } + } + + return yamlQuery, nil +} + +// translateMeasureQuery translates measure specific fields +func (t *Translator) translateMeasureQuery(stmt *SelectStatement, yamlQuery *QueryYAML) (*QueryYAML, error) { + // Translate projection + if stmt.Projection != nil { + if stmt.Projection.All { + // SELECT * - no specific projection needed + } else if stmt.Projection.TopN != nil { + // Handle TOP N projection + yamlQuery.Top = &TopYAML{ + Number: stmt.Projection.TopN.N, + } + if stmt.OrderBy != nil { + if stmt.OrderBy.Desc { + yamlQuery.Top.FieldValueSort = "DESC" + } else { + yamlQuery.Top.FieldValueSort = "ASC" + } + } + } else if len(stmt.Projection.Columns) > 0 { + // Separate tags and fields based on column type or function + for _, col := range stmt.Projection.Columns { + if col.Function != nil { + // Aggregate function + yamlQuery.Agg = &AggregationYAML{ + Function: col.Function.Function, + FieldName: col.Function.Column, + } + } else { + // Regular column - determine if tag or field + switch col.Type { + case ColumnTypeTag: + yamlQuery.TagProjection = append(yamlQuery.TagProjection, col.Name) + case ColumnTypeField: + yamlQuery.FieldProjection = append(yamlQuery.FieldProjection, col.Name) + default: + // Auto-detect: assume tag for now (schema lookup would be needed) + yamlQuery.TagProjection = append(yamlQuery.TagProjection, col.Name) + } + } + } + } + } + + // Translate GROUP BY + if stmt.GroupBy != nil { + yamlQuery.GroupBy = &GroupByYAML{ + TagProjection: stmt.GroupBy.Columns, + } + } + + // Translate ORDER BY + if stmt.OrderBy != nil { + yamlQuery.OrderBy = &OrderByYAML{ + IndexRuleName: stmt.OrderBy.Column, + } + if stmt.OrderBy.Desc { + yamlQuery.OrderBy.Sort = "DESC" + } else { + yamlQuery.OrderBy.Sort = "ASC" + } + } + + return yamlQuery, nil +} + +// translatePropertyQuery translates property specific fields +func (t *Translator) translatePropertyQuery(stmt *SelectStatement, yamlQuery *QueryYAML) (*QueryYAML, error) { + // Translate projection + if stmt.Projection != nil && !stmt.Projection.All && len(stmt.Projection.Columns) > 0 { + for _, col := range stmt.Projection.Columns { + yamlQuery.Projection = append(yamlQuery.Projection, col.Name) + } + } + + // Extract IDs from WHERE clause if present + if stmt.Where != nil { + for _, condition := range stmt.Where.Conditions { + if strings.ToUpper(condition.Left) == "ID" { + if condition.Operator == OpEqual && condition.Right != nil { + yamlQuery.IDs = []string{condition.Right.StringVal} + } else if condition.Operator == OpIn && len(condition.Values) > 0 { + for _, val := range condition.Values { + yamlQuery.IDs = append(yamlQuery.IDs, val.StringVal) + } + } + } + } + } + + return yamlQuery, nil +} + +// translateTopNStatement translates SHOW TOP N statements +func (t *Translator) translateTopNStatement(stmt *TopNStatement, yamlQuery *QueryYAML, query *ParsedQuery) (*QueryYAML, error) { + // Set Top-N specific fields + yamlQuery.TopN = stmt.TopN + yamlQuery.Trace = stmt.QueryTrace + + // Translate TIME clause + if stmt.Time != nil { + timeRange, err := t.translateTimeCondition(stmt.Time) + if err != nil { + return nil, err + } + yamlQuery.TimeRange = timeRange + } + + // Translate WHERE clause to conditions (Top-N uses different field name) + if stmt.Where != nil { + conditions, err := t.translateWhereClause(stmt.Where) + if err != nil { + return nil, err + } + yamlQuery.Conditions = conditions + } + + // Translate AGGREGATE BY + if stmt.AggregateBy != nil { + yamlQuery.Agg = &AggregationYAML{ + Function: stmt.AggregateBy.Function, + FieldName: stmt.AggregateBy.Column, + } + } + + // Translate ORDER BY + if stmt.OrderBy != nil { + if stmt.OrderBy.Desc { + yamlQuery.FieldValueSort = "DESC" + } else { + yamlQuery.FieldValueSort = "ASC" + } + } + + return yamlQuery, nil +} + +// translateTimeCondition translates TIME conditions to time range +func (t *Translator) translateTimeCondition(timeCondition *TimeCondition) (*TimeRangeYAML, error) { + timeRange := &TimeRangeYAML{} + + switch timeCondition.Operator { + case TimeOpEqual: + timestamp, err := t.parseTimestamp(timeCondition.Timestamp) + if err != nil { + return nil, err + } + timeRange.Begin = timestamp + timeRange.End = timestamp + case TimeOpGreater: + timestamp, err := t.parseTimestamp(timeCondition.Timestamp) + if err != nil { + return nil, err + } + timeRange.Begin = timestamp + // End time not specified, use current time + timeRange.End = t.context.CurrentTime.Format(time.RFC3339) + case TimeOpLess: + timestamp, err := t.parseTimestamp(timeCondition.Timestamp) + if err != nil { + return nil, err + } + // Begin time not specified, use a time far in the past + timeRange.Begin = "1970-01-01T00:00:00Z" + timeRange.End = timestamp + case TimeOpGreaterEqual: + timestamp, err := t.parseTimestamp(timeCondition.Timestamp) + if err != nil { + return nil, err + } + timeRange.Begin = timestamp + timeRange.End = t.context.CurrentTime.Format(time.RFC3339) + case TimeOpLessEqual: + timestamp, err := t.parseTimestamp(timeCondition.Timestamp) + if err != nil { + return nil, err + } + timeRange.Begin = "1970-01-01T00:00:00Z" + timeRange.End = timestamp + case TimeOpBetween: + begin, err := t.parseTimestamp(timeCondition.Begin) + if err != nil { + return nil, err + } + end, err := t.parseTimestamp(timeCondition.End) + if err != nil { + return nil, err + } + timeRange.Begin = begin + timeRange.End = end + } + + return timeRange, nil +} + +// parseTimestamp parses timestamp string (absolute or relative) +func (t *Translator) parseTimestamp(timestamp string) (string, error) { + // Try parsing as absolute time first (RFC3339) + if parsedTime, err := time.Parse(time.RFC3339, timestamp); err == nil { + return parsedTime.Format(time.RFC3339), nil + } + + // Try parsing as relative time (duration string) + if strings.ToLower(timestamp) == "now" { + return t.context.CurrentTime.Format(time.RFC3339), nil + } + + duration, err := str2duration.ParseDuration(timestamp) + if err != nil { + return "", fmt.Errorf("invalid timestamp format: %s", timestamp) + } + + resultTime := t.context.CurrentTime.Add(duration) + return resultTime.Format(time.RFC3339), nil +} + +// translateWhereClause translates WHERE conditions to criteria format +func (t *Translator) translateWhereClause(where *WhereClause) ([]map[string]interface{}, error) { + var criteria []map[string]interface{} + + for _, condition := range where.Conditions { + criterion := make(map[string]interface{}) + + // Set tag name + criterion["tagName"] = condition.Left + + // Set operator and values + switch condition.Operator { + case OpEqual: + criterion["op"] = "BINARY_OP_EQ" + if condition.Right != nil { + criterion["value"] = t.translateValue(condition.Right) + } + case OpNotEqual: + criterion["op"] = "BINARY_OP_NE" + if condition.Right != nil { + criterion["value"] = t.translateValue(condition.Right) + } + case OpGreater: + criterion["op"] = "BINARY_OP_GT" + if condition.Right != nil { + criterion["value"] = t.translateValue(condition.Right) + } + case OpLess: + criterion["op"] = "BINARY_OP_LT" + if condition.Right != nil { + criterion["value"] = t.translateValue(condition.Right) + } + case OpGreaterEqual: + criterion["op"] = "BINARY_OP_GE" + if condition.Right != nil { + criterion["value"] = t.translateValue(condition.Right) + } + case OpLessEqual: + criterion["op"] = "BINARY_OP_LE" + if condition.Right != nil { + criterion["value"] = t.translateValue(condition.Right) + } + case OpIn: + criterion["op"] = "BINARY_OP_IN" + var values []interface{} + for _, val := range condition.Values { + values = append(values, t.translateValue(val)) + } + criterion["value"] = map[string]interface{}{ + "strArray": map[string]interface{}{ + "value": values, + }, + } + case OpNotIn: + criterion["op"] = "BINARY_OP_NOT_IN" + var values []interface{} + for _, val := range condition.Values { + values = append(values, t.translateValue(val)) + } + criterion["value"] = map[string]interface{}{ + "strArray": map[string]interface{}{ + "value": values, + }, + } + case OpHaving: + criterion["op"] = "BINARY_OP_HAVING" + var values []interface{} + for _, val := range condition.Values { + values = append(values, t.translateValue(val)) + } + criterion["value"] = map[string]interface{}{ + "strArray": map[string]interface{}{ + "value": values, + }, + } + case OpNotHaving: + criterion["op"] = "BINARY_OP_NOT_HAVING" + var values []interface{} + for _, val := range condition.Values { + values = append(values, t.translateValue(val)) + } + criterion["value"] = map[string]interface{}{ + "strArray": map[string]interface{}{ + "value": values, + }, + } + case OpMatch: + criterion["op"] = "BINARY_OP_MATCH" + if condition.Right != nil { + criterion["value"] = t.translateValue(condition.Right) + } + } + + criteria = append(criteria, criterion) + } + + return criteria, nil +} + +// translateValue translates a value to the appropriate YAML format +func (t *Translator) translateValue(value *Value) interface{} { + if value.IsNull { + return nil + } + + switch value.Type { + case ValueTypeString: + return map[string]interface{}{ + "str": map[string]interface{}{ + "value": value.StringVal, + }, + } + case ValueTypeInteger: + return map[string]interface{}{ + "int": map[string]interface{}{ + "value": strconv.FormatInt(value.Integer, 10), + }, + } + default: + return nil + } +} + +// TranslateQuery is a convenience function to translate a BydbQL query string to YAML +func TranslateQuery(query string, context *QueryContext) ([]byte, []string, error) { + parsed, errors := ParseQuery(query) + if len(errors) > 0 { + return nil, errors, fmt.Errorf("parsing errors occurred") + } + + if parsed == nil { + return nil, []string{"failed to parse query"}, fmt.Errorf("parsing failed") + } + + translator := NewTranslator(context) + yamlBytes, err := translator.TranslateToYAML(parsed) + if err != nil { + return nil, []string{err.Error()}, err + } + + return yamlBytes, nil, nil +} \ No newline at end of file
