jason810496 commented on code in PR #67315: URL: https://github.com/apache/airflow/pull/67315#discussion_r3296225102
########## go-sdk/pkg/execution/messages.go: ########## @@ -0,0 +1,405 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package execution + +import ( + "fmt" + "time" +) + +// Inbound messages (Supervisor -> Runtime). + +// TaskInstanceInfo holds task instance details from StartupDetails. +type TaskInstanceInfo struct { + ID string + TaskID string + DagID string + RunID string + TryNumber int + DagVersionID string + MapIndex int + ContextCarrier map[string]any +} + +func decodeTaskInstanceInfo(m map[string]any) (TaskInstanceInfo, error) { + if m == nil { + return TaskInstanceInfo{}, fmt.Errorf("nil task instance map") + } + id, err := mapString(m, "id") + if err != nil { + return TaskInstanceInfo{}, fmt.Errorf("ti.id: %w", err) + } + taskID, err := mapString(m, "task_id") + if err != nil { + return TaskInstanceInfo{}, fmt.Errorf("ti.task_id: %w", err) + } + dagID, err := mapString(m, "dag_id") + if err != nil { + return TaskInstanceInfo{}, fmt.Errorf("ti.dag_id: %w", err) + } + runID, err := mapString(m, "run_id") + if err != nil { + return TaskInstanceInfo{}, fmt.Errorf("ti.run_id: %w", err) + } + tryNumber := mapIntOr(m, "try_number", 1) + dagVersionID := mapStringOr(m, "dag_version_id", "") + mapIndex := mapIntOr(m, "map_index", -1) + contextCarrier := mapMap(m, "context_carrier") + + return TaskInstanceInfo{ + ID: id, + TaskID: taskID, + DagID: dagID, + RunID: runID, + TryNumber: tryNumber, + DagVersionID: dagVersionID, + MapIndex: mapIndex, + ContextCarrier: contextCarrier, + }, nil +} + +// BundleInfoMsg holds bundle identification from StartupDetails. +type BundleInfoMsg struct { + Name string + Version string +} + +func decodeBundleInfo(m map[string]any) BundleInfoMsg { + if m == nil { + return BundleInfoMsg{} + } + return BundleInfoMsg{ + Name: mapStringOr(m, "name", ""), + Version: mapStringOr(m, "version", ""), + } +} + +// TIRunContext holds the runtime context for a task instance. +type TIRunContext struct { + LogicalDate *time.Time + DataIntervalStart *time.Time + DataIntervalEnd *time.Time +} + +func decodeTIRunContext(m map[string]any) (TIRunContext, error) { + if m == nil { + return TIRunContext{}, nil + } + ctx := TIRunContext{} + for _, f := range []struct { + key string + dst **time.Time + }{ + {"logical_date", &ctx.LogicalDate}, + {"data_interval_start", &ctx.DataIntervalStart}, + {"data_interval_end", &ctx.DataIntervalEnd}, + } { + raw, present := m[f.key] + if !present || raw == nil { + continue + } + t, err := asTime(raw) + if err != nil { + return TIRunContext{}, fmt.Errorf("ti_context.%s: %w", f.key, err) + } + *f.dst = &t + } + return ctx, nil +} + +// StartupDetails is sent by the supervisor to initiate task execution. +type StartupDetails struct { + TI TaskInstanceInfo + DagRelPath string + BundleInfo BundleInfoMsg + StartDate time.Time + TIContext TIRunContext + SentryIntegration string +} + +func decodeStartupDetails(m map[string]any) (*StartupDetails, error) { + tiMap := mapMap(m, "ti") + ti, err := decodeTaskInstanceInfo(tiMap) + if err != nil { + return nil, fmt.Errorf("decoding ti: %w", err) + } + + dagRelPath := mapStringOr(m, "dag_rel_path", "") + bundleInfo := decodeBundleInfo(mapMap(m, "bundle_info")) + + var startDate time.Time + if raw, present := m["start_date"]; present && raw != nil { + startDate, err = asTime(raw) + if err != nil { + return nil, fmt.Errorf("start_date: %w", err) + } + } + + tiContext, err := decodeTIRunContext(mapMap(m, "ti_context")) + if err != nil { + return nil, fmt.Errorf("decoding ti_context: %w", err) + } + sentryIntegration := mapStringOr(m, "sentry_integration", "") + + return &StartupDetails{ + TI: ti, + DagRelPath: dagRelPath, + BundleInfo: bundleInfo, + StartDate: startDate, + TIContext: tiContext, + SentryIntegration: sentryIntegration, + }, nil +} + +// Response types (for runtime-initiated requests). + +// ConnectionResult is the response to GetConnection. +type ConnectionResult struct { + ConnID string + ConnType string + Host string + Schema string + Login string + Password string + Port int + Extra string +} + +func decodeConnectionResult(m map[string]any) (*ConnectionResult, error) { + return &ConnectionResult{ + ConnID: mapStringOr(m, "conn_id", ""), + ConnType: mapStringOr(m, "conn_type", ""), + Host: mapStringOr(m, "host", ""), + Schema: mapStringOr(m, "schema", ""), + Login: mapStringOr(m, "login", ""), + Password: mapStringOr(m, "password", ""), + Port: mapIntOr(m, "port", 0), + Extra: mapStringOr(m, "extra", ""), + }, nil +} + +// VariableResult is the response to GetVariable. +type VariableResult struct { + Key string + Value any +} + +func decodeVariableResult(m map[string]any) (*VariableResult, error) { + return &VariableResult{ + Key: mapStringOr(m, "key", ""), + Value: m["value"], + }, nil +} + +// XComResult is the response to GetXCom. +type XComResult struct { + Key string + Value any +} + +func decodeXComResult(m map[string]any) (*XComResult, error) { + return &XComResult{ + Key: mapStringOr(m, "key", ""), + Value: m["value"], + }, nil +} + +// ErrorResponse represents an error returned by the supervisor. +type ErrorResponse struct { + Error string + Detail any +} + +func decodeErrorResponse(m map[string]any) *ErrorResponse { + if m == nil { + return nil + } + return &ErrorResponse{ + Error: mapStringOr(m, "error", ""), + Detail: m["detail"], + } +} + +// Outbound messages (Runtime -> Supervisor). + +// GetConnectionMsg is sent to request a connection from the supervisor. +type GetConnectionMsg struct { + ConnID string +} + +func (m GetConnectionMsg) toMap() map[string]any { + return map[string]any{ + "type": "GetConnection", + "conn_id": m.ConnID, + } +} + +// GetVariableMsg is sent to request a variable from the supervisor. +type GetVariableMsg struct { + Key string +} + +func (m GetVariableMsg) toMap() map[string]any { + return map[string]any{ + "type": "GetVariable", + "key": m.Key, + } +} + +// GetXComMsg is sent to request an XCom value from the supervisor. +type GetXComMsg struct { + Key string + DagID string + TaskID string + RunID string + MapIndex *int + IncludePriorDates bool +} + +func (m GetXComMsg) toMap() map[string]any { + result := map[string]any{ + "type": "GetXCom", + "key": m.Key, + "dag_id": m.DagID, + "task_id": m.TaskID, + "run_id": m.RunID, + "include_prior_dates": m.IncludePriorDates, + } + if m.MapIndex != nil { + result["map_index"] = *m.MapIndex + } + return result +} + +// SetXComMsg is sent to set an XCom value. +type SetXComMsg struct { + Key string + Value any + DagID string + TaskID string + RunID string + MapIndex int Review Comment: Rest of the comments were addressed in https://github.com/apache/airflow/pull/67315/changes/9ca34584b896f60919dbe61bb5a92935d1434f8d. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
