This is an automated email from the ASF dual-hosted git repository.
kichan pushed a commit to branch master
in repository
https://gitbox.apache.org/repos/asf/trafficserver-ingress-controller.git
The following commit(s) were added to refs/heads/master by this push:
new 9a69df8 Issue 298: Add Support for Caching CRD (#300)
9a69df8 is described below
commit 9a69df84f011730bcece3517b8336dd94d3c242b
Author: saurabh-saraswat <[email protected]>
AuthorDate: Thu Sep 11 19:25:03 2025 +0530
Issue 298: Add Support for Caching CRD (#300)
* Issue 298: Add Support for Caching CRD
* Apply caching policies and do clean up
* /Fixing issues encountered in CI automation cases
* /Fixing issues fmt issues
* Added unit test cases
* Resolved error encountered during execution of golangci-lint. And added
documentation for Caching CRD
---
Dockerfile | 2 +
ats_caching/atscachingpolicy.yaml | 19 ++
ats_caching/crd-atscachingpolicy.yaml | 68 +++++
docs/CRD_CACHING.md | 148 ++++++++++
main/main.go | 18 +-
proxy/ats.go | 11 +
proxy/fakeATS.go | 5 +
tests/data/setup/configmaps/ats-configmap.yaml | 7 +-
tests/suite/test_ingress.py | 88 +++++-
watcher/handlerCache.go | 204 ++++++++++++++
watcher/handlerCache_test.go | 195 +++++++++++++
watcher/watcher.go | 51 +++-
watcher/watcher_test.go | 371 ++++++++++++++++++-------
13 files changed, 1061 insertions(+), 126 deletions(-)
diff --git a/Dockerfile b/Dockerfile
index bbc27cd..f1cf0be 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -167,6 +167,8 @@ RUN adduser -S -D -H -u 1000 -h /tmp -s /sbin/nologin -G
ats -g ats ats
COPY --from=builder --chown=ats:ats /opt/ats /opt/ats
+ENV PATH="/opt/ats/bin:${PATH}
+
USER ats
ENTRYPOINT ["/opt/ats/bin/entry.sh"]
diff --git a/ats_caching/atscachingpolicy.yaml
b/ats_caching/atscachingpolicy.yaml
new file mode 100644
index 0000000..765734d
--- /dev/null
+++ b/ats_caching/atscachingpolicy.yaml
@@ -0,0 +1,19 @@
+apiVersion: k8s.trafficserver.apache.com/v1
+kind: ATSCachingPolicy
+metadata:
+ name: my-app-caching
+ namespace: caching-ats-new
+spec:
+ rules:
+ - name: home-endpoint
+ primarySpecifier:
+ type: url_regex
+ pattern: ".*/app1"
+ action: cache
+ ttl: "12s"
+ - name: static-assets
+ primarySpecifier:
+ type: url_regex
+ pattern: "^/app2"
+ action: cache
+ ttl: "20s"
diff --git a/ats_caching/crd-atscachingpolicy.yaml
b/ats_caching/crd-atscachingpolicy.yaml
new file mode 100644
index 0000000..5ef1f17
--- /dev/null
+++ b/ats_caching/crd-atscachingpolicy.yaml
@@ -0,0 +1,68 @@
+apiVersion: apiextensions.k8s.io/v1
+kind: CustomResourceDefinition
+metadata:
+ name: atscachingpolicies.k8s.trafficserver.apache.com
+spec:
+ group: k8s.trafficserver.apache.com
+ scope: Cluster
+ names:
+ plural: atscachingpolicies
+ singular: atscachingpolicy
+ kind: ATSCachingPolicy
+ shortNames:
+ - atscp
+ versions:
+ - name: v1
+ served: true
+ storage: true
+ schema:
+ openAPIV3Schema:
+ type: object
+ properties:
+ spec:
+ type: object
+ properties:
+ rules:
+ type: array
+ description: List of caching rules
+ items:
+ type: object
+ properties:
+ name:
+ type: string
+ description: Human-friendly rule name
+ primarySpecifier:
+ type: object
+ properties:
+ type:
+ type: string
+ description: 'One of url_regex, dest_domain,
dest_host, dest_ip'
+ pattern:
+ type: string
+ description: Pattern to match (regex, domain,
host, or IP)
+ secondarySpecifiers:
+ type: object
+ properties:
+ port:
+ type: integer
+ scheme:
+ type: string
+ method:
+ type: string
+ prefix:
+ type: string
+ suffix:
+ type: string
+ src_ip:
+ type: string
+ time:
+ type: string
+ internal:
+ type: boolean
+ action:
+ type: string
+ description: Cache action (e.g., cache, never-cache)
+ ttl:
+ type: string
+ description: Cache time to live (e.g., "10s", "1h")
+
diff --git a/docs/CRD_CACHING.md b/docs/CRD_CACHING.md
new file mode 100644
index 0000000..936e515
--- /dev/null
+++ b/docs/CRD_CACHING.md
@@ -0,0 +1,148 @@
+# Caching CRD for ATS
+
+
+## Before enabling the cache
+
+Let us check how we can verify whether caching is happening or not using curl
command:
+```bash
+curl -v -H "Host: test.edge.com" http://{minikubeip}:30080/app1
+```
+We need to use the respective ip of the minikube we are using or the node ip
on which the ats ingress controller is running.
+
+The response we receive has the following details along with the HTML response
(output for the first curl command):
+```bash
+> GET /app1 HTTP/1.1
+> Host: test.edge.com
+> User-Agent: curl/8.5.0
+> Accept: */*
+>
+< HTTP/1.1 200 OK
+< X-Powered-By: Express
+< Accept-Ranges: bytes
+< Cache-Control: public, max-age=0
+< Last-Modified: Fri, 18 Jul 2025 07:01:16 GMT
+< Content-Type: text/html; charset=UTF-8
+< Content-Length: 190
+< Date: Wed, 03 Sep 2025 09:48:09 GMT
+< Etag: W/"be-1981c565260"
+< Age: 0
+< Connection: keep-alive
+< Server: ATS/9.2.11
+```
+Now, when we run the same command after 6 seconds, we will have a response
which will have following details:
+```bash
+> GET /app1 HTTP/1.1
+> Host: test.edge.com
+> User-Agent: curl/8.5.0
+> Accept: */*
+>
+< HTTP/1.1 200 OK
+< X-Powered-By: Express
+< Accept-Ranges: bytes
+< Cache-Control: public, max-age=0
+< Last-Modified: Fri, 18 Jul 2025 07:01:16 GMT
+< Content-Type: text/html; charset=UTF-8
+< Content-Length: 190
+< Date: Wed, 03 Sep 2025 09:48:15 GMT
+< Etag: W/"be-1981c565260"
+< Age: 0
+< Connection: keep-alive
+< Server: ATS/9.2.11
+```
+When we observe the details for the response for both the curl executions, the
value for `Age` is `0` and the `Date` field has different values (look for the
seconds), indicating response was not cached.
+
+## Enabling the cache
+
+### Steps to take before applying caching CRD (needed only first time) :
+To apply a file we use `kubectl apply -f <filename.yaml>`.
+- Go to the folder `trafficserver-ingress-controller/ats_caching`.
+- Apply the file `ats-cachingpolicy-role.yaml`.
+- Apply the file `ats-cachingpolicy-binding.yaml`.
+
+The `ats-cachingpolicy-role.yaml` file defines a cluster-wide role named
`ats-cachingpolicy-role`, which grants read-only permissions (`get`, `list`,
`watch`) on the `atscachingpolicies` resource within the
`k8s.trafficserver.apache.com` API group.
+
+The `ats-cachingpolicy-binding.yaml` file binds the `ats-cachingpolicy-role`
cluster role to the `default` service account, which allows the pods running
under the `default` service account to read and watch `ATSCachingPolicy`
objects across the cluster.
+
+### Steps for applying CRD to enable caching:
+- Before applying the CRD check the currently available crds using `kubectl
get crd`.
+- Go to the folder `trafficserver-ingress-controller/ats_caching`
+- Apply the file `crd-atscachingpolicy.yaml`.
+- Apply the file `atscachingpolicy.yaml`.
+- Now again check the available crds, `using kubectl get crd`.
+
+We will notice a new crd:
+```bash
+NAME CREATED AT
+atscachingpolicies.k8s.trafficserver.apache.com 2025-09-03T09:45:13Z
+
+```
+Which was not available earlier.
+
+
+### The content of atscachingpolicy.yaml is:
+```yaml
+apiVersion: k8s.trafficserver.apache.com/v1
+kind: ATSCachingPolicy
+metadata:
+ name: my-app-caching
+ namespace: caching-ats-new
+spec:
+ rules:
+ - name: home-endpoint
+ primarySpecifier:
+ type: url_regex
+ pattern: ".*/app1"
+ action: cache
+ ttl: "12s"
+```
+Here, we have enabled cache for the pattern `.*app1` for `12` seconds. After
`12` seconds of running the curl command the response won’t be available in the
cache.
+
+## After enabling the cache
+Execute the curl command
+```bash
+curl -v -H "Host: test.edge.com" http://{minikubeip}:30080/app1
+```
+The response we receive has the following details along with the HTML
response( if same command was not run few minutes earlier):
+```bash
+> GET /app1 HTTP/1.1
+> Host: test.edge.com
+> User-Agent: curl/8.5.0
+> Accept: */*
+>
+< HTTP/1.1 200 OK
+< X-Powered-By: Express
+< Accept-Ranges: bytes
+< Cache-Control: public, max-age=0
+< Last-Modified: Fri, 18 Jul 2025 07:01:16 GMT
+< Content-Type: text/html; charset=UTF-8
+< Content-Length: 190
+< Date: Wed, 03 Sep 2025 09:51:02 GMT
+< Etag: W/"be-1981c565260"
+< Age: 0
+< Connection: keep-alive
+< Server: ATS/9.2.11
+```
+Now, when we run the same command after 7 seconds we will have a response
along with following details:
+```bash
+> GET /app1 HTTP/1.1
+> Host: test.edge.com
+> User-Agent: curl/8.5.0
+> Accept: */*
+>
+< HTTP/1.1 200 OK
+< X-Powered-By: Express
+< Accept-Ranges: bytes
+< Cache-Control: public, max-age=0
+< Last-Modified: Fri, 18 Jul 2025 07:01:16 GMT
+< Content-Type: text/html; charset=UTF-8
+< Content-Length: 190
+< Date: Wed, 03 Sep 2025 09:51:02 GMT
+< Etag: W/"be-1981c565260"
+< Age: 7
+< Connection: keep-alive
+< Server: ATS/9.2.11
+```
+Observe both curl execution details, in both cases the value for `Age` is
different and the `Date` field has the same values (look for the seconds).
+
+
+
diff --git a/main/main.go b/main/main.go
index f79d9a2..da8949e 100644
--- a/main/main.go
+++ b/main/main.go
@@ -24,9 +24,9 @@ import (
"syscall"
"time"
+ "k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
-
"k8s.io/client-go/tools/clientcmd"
_ "k8s.io/client-go/util/workqueue"
@@ -126,6 +126,11 @@ func main() {
log.Panicln(err.Error())
}
+ dynamicClient, err := dynamic.NewForConfig(config)
+ if err != nil {
+ log.Panicln(err.Error())
+ }
+
stopChan := make(chan struct{})
// ------------ Resolving Namespaces
--------------------------------------
@@ -150,11 +155,12 @@ func main() {
}
watcher := w.Watcher{
- Cs: clientset,
- ATSNamespace: *atsNamespace,
- ResyncPeriod: *resyncPeriod,
- Ep: &endpoint,
- StopChan: stopChan,
+ Cs: clientset,
+ DynamicClient: dynamicClient,
+ ATSNamespace: *atsNamespace,
+ ResyncPeriod: *resyncPeriod,
+ Ep: &endpoint,
+ StopChan: stopChan,
}
err = watcher.Watch()
diff --git a/proxy/ats.go b/proxy/ats.go
index 3e9eca0..b23c702 100644
--- a/proxy/ats.go
+++ b/proxy/ats.go
@@ -27,6 +27,7 @@ import (
type ATSManagerInterface interface {
ConfigSet(k, v string) (string, error)
ConfigGet(k string) (string, error)
+ CacheSet() (string, error)
IncludeIngressClass(c string) bool
}
@@ -58,6 +59,16 @@ func (m *ATSManager) ConfigSet(k, v string) (msg string, err
error) {
return fmt.Sprintf("Ran p.Key: %s p.Val: %s --> stdoutStderr: %q", k,
v, stdoutStderr), nil
}
+func (m *ATSManager) CacheSet() (msg string, err error) {
+ cmd := exec.Command("traffic_ctl", "config", "reload")
+ stdoutStderr, err := cmd.CombinedOutput()
+ if err != nil {
+ return "", fmt.Errorf("failed to execute: traffic_ctl config
reload Error: %s", err.Error())
+ }
+ return fmt.Sprintf("Reload succesful --> stdoutStderr: %q",
stdoutStderr), nil
+
+}
+
func (m *ATSManager) ConfigGet(k string) (msg string, err error) {
cmd := exec.Command("traffic_ctl", "config", "get", k)
stdoutStderr, err := cmd.CombinedOutput()
diff --git a/proxy/fakeATS.go b/proxy/fakeATS.go
index f9dbc8c..14144db 100644
--- a/proxy/fakeATS.go
+++ b/proxy/fakeATS.go
@@ -38,6 +38,11 @@ func (m *FakeATSManager) IncludeIngressClass(c string) bool {
return false
}
+func (m *FakeATSManager) CacheSet() (msg string, err error) {
+ return "Config reload succesful", nil
+
+}
+
func (m *FakeATSManager) ConfigSet(k, v string) (msg string, err error) {
m.Config[k] = v
return fmt.Sprintf("Ran p.Key: %s p.Val: %s", k, v), nil
diff --git a/tests/data/setup/configmaps/ats-configmap.yaml
b/tests/data/setup/configmaps/ats-configmap.yaml
index 7d66ddd..7219922 100644
--- a/tests/data/setup/configmaps/ats-configmap.yaml
+++ b/tests/data/setup/configmaps/ats-configmap.yaml
@@ -17,7 +17,7 @@
apiVersion: v1
kind: Namespace
metadata:
- name: trafficserver-test
+ name: trafficserver-test
---
@@ -26,8 +26,13 @@ kind: ConfigMap
metadata:
namespace: trafficserver-test
name: ats
+ annotations:
+ ats-configmap: "true"
data:
# reloadable data only
proxy.config.output.logfile.rolling_enabled: "1"
proxy.config.output.logfile.rolling_interval_sec: "3000"
proxy.config.restart.active_client_threshold: "0"
+
+ proxy.config.http.cache.http: "1"
+ proxy.config.http.cache.required_headers: "0"
diff --git a/tests/suite/test_ingress.py b/tests/suite/test_ingress.py
index 7d234a7..81e7f4f 100644
--- a/tests/suite/test_ingress.py
+++ b/tests/suite/test_ingress.py
@@ -19,6 +19,7 @@ import pytest
import os
import time
import textwrap
+import subprocess
def kubectl_apply(yaml_path):
os.system('kubectl apply -f ' + yaml_path)
@@ -43,6 +44,9 @@ def setup_module(module):
kubectl_apply('data/setup/apps/')
kubectl_apply('data/setup/ingresses/')
time.sleep(90)
+ kubectl_apply('../ats_caching/crd-atscachingpolicy.yaml')
+ kubectl_apply('../ats_caching/atscachingpolicy.yaml')
+ time.sleep(5)
misc_command('kubectl get all -A')
misc_command('kubectl get pod -A -o wide')
misc_command('kubectl logs $(kubectl get pod -n trafficserver-test-2 -o
name | head -1) -n trafficserver-test-2')
@@ -116,6 +120,7 @@ def get_expected_response_app2():
return ' '.join(resp.split())
+
class TestIngress:
def test_basic_routing_edge_app1(self, minikubeip):
req_url = "http://" + minikubeip + ":30080/app1"
@@ -162,6 +167,86 @@ class TestIngress:
f"Expected: 200 response code for test_basic_routing"
assert ' '.join(resp.text.split()) == get_expected_response_app2()
+ def test_cache_app1(self, minikubeip):
+ kubectl_apply('../ats_caching/crd-atscachingpolicy.yaml')
+ kubectl_apply('../ats_caching/atscachingpolicy.yaml')
+ time.sleep(15)
+
+ command = f'curl -i -v -H "Host: test.media.com"
http://{minikubeip}:30080/app1'
+ response_1 = subprocess.run(command, shell=True, capture_output=True,
text=True)
+ response1 = response_1.stdout.strip()
+ response1_list = response1.split('\n')
+ for res in response1_list:
+ if res.__contains__("Age"):
+ age1 = res
+ if res.__contains__("Date"):
+ mod_time1 = res
+ time.sleep(5)
+ response_2 = subprocess.run(command, shell=True, capture_output=True,
text=True)
+ response2 = response_2.stdout.strip()
+ response2_list = response2.split('\n')
+ kubectl_delete('crd atscachingpolicies.k8s.trafficserver.apache.com')
+ for resp in response2_list:
+ if resp.__contains__("Age"):
+ age2 = resp
+ if resp.__contains__("Date"):
+ mod_time2 = resp
+ assert mod_time1 == mod_time2 and age1 != age2, "Expected Date
provided by both responses to be same and the Age mentioned in second response
to be more than 0"
+
+ def test_cache_app1_beyond_ttl(self, minikubeip):
+ kubectl_apply('../ats_caching/crd-atscachingpolicy.yaml')
+ kubectl_apply('../ats_caching/atscachingpolicy.yaml')
+ time.sleep(15)
+
+ command = f'curl -i -v -H "Host: test.media.com"
http://{minikubeip}:30080/app1'
+ response_1 = subprocess.run(command, shell=True, capture_output=True,
text=True)
+ response1 = response_1.stdout.strip()
+ response1_list = response1.split('\n')
+ for res in response1_list:
+ if res.__contains__("Age"):
+ age1 = res
+ if res.__contains__("Date"):
+ mod_time1 = res
+ time.sleep(16)
+ response_2 = subprocess.run(command, shell=True, capture_output=True,
text=True)
+ response2 = response_2.stdout.strip()
+ response2_list = response2.split('\n')
+ for resp in response2_list:
+ if resp.__contains__("Age"):
+ age2 = resp
+ if resp.__contains__("Date"):
+ mod_time2 = resp
+ kubectl_delete('crd atscachingpolicies.k8s.trafficserver.apache.com')
+ expected_age = "Age: 0"
+ assert mod_time1 != mod_time2 and age1 == age2 and age2 ==
expected_age, "Expected Date provided by both responses to be different and the
Age mentioned in both responses to be 0"
+
+ def test_cache_app2(self, minikubeip):
+ kubectl_apply('../ats_caching/crd-atscachingpolicy.yaml')
+ kubectl_apply('../ats_caching/atscachingpolicy.yaml')
+ time.sleep(15)
+
+ command = f'curl -i -v -H "Host: test.edge.com"
http://{minikubeip}:30080/app2'
+ response_1 = subprocess.run(command, shell=True, capture_output=True,
text=True)
+ response1 = response_1.stdout.strip()
+ response1_list = response1.split('\n')
+ for res in response1_list:
+ if res.__contains__("Age"):
+ age1 = res
+ if res.__contains__("Date"):
+ mod_time1 = res
+ time.sleep(9)
+ response_2 = subprocess.run(command, shell=True, capture_output=True,
text=True)
+ response2 = response_2.stdout.strip()
+ response2_list = response2.split('\n')
+ for resp in response2_list:
+ if resp.__contains__("Age"):
+ age2 = resp
+ if resp.__contains__("Date"):
+ mod_time2 = resp
+ kubectl_delete('crd atscachingpolicies.k8s.trafficserver.apache.com')
+ assert mod_time1 != mod_time2 and age1 == age2, "Expected Date
provided by both the responses to be different and the Age to be 0 in both the
responses"
+
+
def test_updating_ingress_media_app2(self, minikubeip):
kubectl_apply('data/ats-ingress-update.yaml')
req_url = "http://" + minikubeip + ":30080/app2"
@@ -200,5 +285,4 @@ class TestIngress:
assert resp.status_code == 301,\
f"Expected: 301 response code for test_snippet_edge_app2"
assert resp.headers['Location'] == 'https://test.edge.com/app2'
-
-
+
diff --git a/watcher/handlerCache.go b/watcher/handlerCache.go
new file mode 100644
index 0000000..30acf58
--- /dev/null
+++ b/watcher/handlerCache.go
@@ -0,0 +1,204 @@
+package watcher
+
+import (
+ "fmt"
+ "log"
+ "os"
+ "strings"
+
+ "github.com/apache/trafficserver-ingress-controller/endpoint"
+ "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
+)
+
+// AtsCacheHandler handles ATSCachingPolicy events
+type AtsCacheHandler struct {
+ ResourceName string
+ Ep *endpoint.Endpoint
+ CachePath string
+}
+
+// Constructor
+func NewAtsCacheHandler(resource string, ep *endpoint.Endpoint, path string)
*AtsCacheHandler {
+ log.Println("ATS Cache Constructor initialized ")
+ return &AtsCacheHandler{ResourceName: resource, Ep: ep, CachePath: path}
+}
+
+// Update ATS config
+func (h *AtsCacheHandler) UpdateAts() {
+ log.Println("Update ATS called")
+ msg, err := h.Ep.ATSManager.CacheSet()
+ if err != nil {
+ log.Println("UpdateAts error:", err)
+ } else {
+ log.Println("ATS updated:", msg)
+ }
+}
+
+// Add handles creation of ATSCachingPolicy resources
+func (h *AtsCacheHandler) Add(obj interface{}) {
+ u := obj.(*unstructured.Unstructured)
+ log.Printf("[ADD] ATSCachingPolicy: %s/%s", u.GetNamespace(),
u.GetName())
+
+ rules, found, err := unstructured.NestedSlice(u.Object, "spec", "rules")
+ if err != nil || !found {
+ log.Printf("Add: rules not found or error occurred: %v", err)
+ return
+ }
+
+ var lines []string
+ for _, rule := range rules {
+ ruleMap, ok := rule.(map[string]interface{})
+ if !ok {
+ continue
+ }
+
+ primary, found, _ := unstructured.NestedMap(ruleMap,
"primarySpecifier")
+ if !found {
+ continue
+ }
+
+ typeval, ok1 := primary["type"].(string)
+ pattern, ok2 := primary["pattern"].(string)
+ action, ok3 := ruleMap["action"].(string)
+ ttl, ok4 := ruleMap["ttl"].(string)
+
+ if !ok1 || !ok2 || !ok3 || !ok4 {
+ continue
+ }
+
+ if action == "cache" {
+ line := fmt.Sprintf("%s=%s ttl-in-cache=%s", typeval,
pattern, ttl)
+ lines = append(lines, line)
+ }
+ }
+
+ configPath := h.CachePath
+ f, err := os.OpenFile(configPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY,
0644)
+ if err != nil {
+ log.Printf("Add: Failed to open cache.config: %v", err)
+ return
+ }
+ defer f.Close()
+
+ for _, line := range lines {
+ if _, err := f.WriteString(line + "\n"); err != nil {
+ log.Printf("Add: Failed to write line to cache.config:
%v", err)
+ }
+ }
+
+ h.UpdateAts()
+}
+
+// Update handles updates to ATSCachingPolicy resources
+func (h *AtsCacheHandler) Update(oldObj, newObj interface{}) {
+ newU := newObj.(*unstructured.Unstructured)
+ log.Printf("[UPDATE] ATSCachingPolicy: %s/%s", newU.GetNamespace(),
newU.GetName())
+
+ newRules, found, err := unstructured.NestedSlice(newU.Object, "spec",
"rules")
+ if err != nil || !found {
+ log.Printf("Update: rules not found or error occurred: %v", err)
+ return
+ }
+
+ configPath := h.CachePath
+ existingData, err := os.ReadFile(configPath)
+ if err != nil {
+ log.Printf("Update: Failed to read cache.config: %v", err)
+ return
+ }
+ lines := strings.Split(string(existingData), "\n")
+
+ for _, rule := range newRules {
+ ruleMap, ok := rule.(map[string]interface{})
+ if !ok {
+ continue
+ }
+ primary, found, _ := unstructured.NestedMap(ruleMap,
"primarySpecifier")
+ if !found {
+ continue
+ }
+
+ typeval, ok1 := primary["type"].(string)
+ pattern, ok2 := primary["pattern"].(string)
+ action, ok3 := ruleMap["action"].(string)
+ newTTL, ok4 := ruleMap["ttl"].(string)
+
+ if !ok1 || !ok2 || !ok3 || !ok4 || action != "cache" {
+ continue
+ }
+
+ for i, line := range lines {
+ if strings.Contains(line, fmt.Sprintf("%s=%s", typeval,
pattern)) {
+ lines[i] = fmt.Sprintf("%s=%s ttl-in-cache=%s",
typeval, pattern, newTTL)
+ break
+ }
+ }
+ }
+
+ err = os.WriteFile(configPath, []byte(strings.Join(lines, "\n")), 0644)
+ if err != nil {
+ log.Printf("Update: Failed to write updated cache.config: %v",
err)
+ }
+ h.UpdateAts()
+}
+
+// Delete handles deletion of ATSCachingPolicy resources
+func (h *AtsCacheHandler) Delete(obj interface{}) {
+ u := obj.(*unstructured.Unstructured)
+ log.Printf("[DELETE] ATSCachingPolicy: %s/%s", u.GetNamespace(),
u.GetName())
+
+ configPath := h.CachePath
+ existingData, err := os.ReadFile(configPath)
+ if err != nil {
+ log.Printf("Delete: Failed to read cache.config: %v", err)
+ return
+ }
+ lines := strings.Split(string(existingData), "\n")
+
+ rules, found, err := unstructured.NestedSlice(u.Object, "spec", "rules")
+ if err != nil || !found {
+ log.Printf("Delete: rules not found or error occurred: %v", err)
+ return
+ }
+
+ patternsToDelete := make(map[string]string)
+ for _, rule := range rules {
+ ruleMap, ok := rule.(map[string]interface{})
+ if !ok {
+ continue
+ }
+ primary, found, _ := unstructured.NestedMap(ruleMap,
"primarySpecifier")
+ if !found {
+ continue
+ }
+
+ typeval, ok1 := primary["type"].(string)
+ pattern, ok2 := primary["pattern"].(string)
+ action, ok3 := ruleMap["action"].(string)
+
+ if ok1 && ok2 && ok3 && action == "cache" {
+ patternsToDelete[typeval] = pattern
+ }
+ }
+
+ var updatedLines []string
+ for _, line := range lines {
+ shouldDelete := false
+ for typeval, pattern := range patternsToDelete {
+ if strings.Contains(line, fmt.Sprintf("%s=%s", typeval,
pattern)) {
+ shouldDelete = true
+ break
+ }
+ }
+ if !shouldDelete {
+ updatedLines = append(updatedLines, line)
+ }
+ }
+
+ err = os.WriteFile(configPath, []byte(strings.Join(updatedLines,
"\n")), 0644)
+ if err != nil {
+ log.Printf("Delete: Failed to write updated cache.config: %v",
err)
+ }
+
+ h.UpdateAts()
+}
diff --git a/watcher/handlerCache_test.go b/watcher/handlerCache_test.go
new file mode 100644
index 0000000..496cbdb
--- /dev/null
+++ b/watcher/handlerCache_test.go
@@ -0,0 +1,195 @@
+package watcher
+
+import (
+ "os"
+ "path/filepath"
+ "testing"
+
+ "github.com/apache/trafficserver-ingress-controller/endpoint"
+ "github.com/apache/trafficserver-ingress-controller/proxy"
+ "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
+)
+
+// newTestHandler creates a temporary AtsCacheHandler for testing.
+// It overrides the handler's filePath to point to a temp cache.config file
+// instead of the real /opt/ats/etc/trafficserver/cache.config.
+func newTestHandler(t *testing.T) (*AtsCacheHandler, string) {
+ tmpDir := t.TempDir()
+ tmpFile := filepath.Join(tmpDir, "cache.config")
+
+ f, err := os.Create(tmpFile)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ defer f.Close()
+ // Ensure directory exists
+ //os.MkdirAll(filepath.Dir(tmpFile), 0755)
+
+ ep := createExampleEndpointWithFakeATSCache()
+ h := NewAtsCacheHandler("test-resource", &ep, tmpFile)
+
+ return h, tmpFile
+}
+
+// newCachingPolicy creates an unstructured ATSCachingPolicy object
+// with the given name and rules. The rules must be []interface{} type.
+func newCachingPolicy(name string, rules []interface{})
*unstructured.Unstructured {
+ u := &unstructured.Unstructured{
+ Object: map[string]interface{}{
+ "apiVersion": "example.com/v1",
+ "kind": "ATSCachingPolicy",
+ "metadata": map[string]interface{}{
+ "name": name,
+ "namespace": "default",
+ },
+ "spec": map[string]interface{}{
+ "rules": rules,
+ },
+ },
+ }
+ return u
+}
+
+// TestAddCachingPolicy verifies that calling h.Add(policy)
+// writes the expected caching rule to cache.config and reloads configurations.
+func TestAddCachingPolicy(t *testing.T) {
+ h, tmpFile := newTestHandler(t)
+
+ rules := []interface{}{
+ map[string]interface{}{
+ "primarySpecifier": map[string]interface{}{
+ "type": "url_regex",
+ "pattern": "/images/.*",
+ },
+ "action": "cache",
+ "ttl": "3600s",
+ },
+ }
+ policy := newCachingPolicy("policy1", rules)
+
+ h.Add(policy)
+
+ data, err := os.ReadFile(tmpFile)
+ if err != nil {
+ t.Fatalf("failed to read cache.config: %v", err)
+ }
+ content := string(data)
+ if content == "" || !containsLine(content, "url_regex=/images/.*
ttl-in-cache=3600s") {
+ t.Errorf("expected cache.config to contain rule, got: %s",
content)
+ }
+}
+
+// TestUpdateCachingPolicy verifies that calling h.Update(nil, newPolicy)
+// modifies the existing caching rule in cache.config with new values and
reloads configurations.
+func TestUpdateCachingPolicy(t *testing.T) {
+ h, tmpFile := newTestHandler(t)
+
+ // Initial rule
+ initial := "url_regex=/images/.* ttl-in-cache=3600s\n"
+ if err := os.WriteFile(tmpFile, []byte(initial), 0644); err != nil {
+ t.Fatalf("failed to setup initial cache.config: %v", err)
+ }
+
+ // Update rule with new TTL
+ rules := []interface{}{
+ map[string]interface{}{
+ "primarySpecifier": map[string]interface{}{
+ "type": "url_regex",
+ "pattern": "/images/.*",
+ },
+ "action": "cache",
+ "ttl": "7200s",
+ },
+ }
+ newPolicy := newCachingPolicy("policy1", rules)
+
+ h.Update(nil, newPolicy)
+
+ data, err := os.ReadFile(tmpFile)
+ if err != nil {
+ t.Fatalf("failed to read cache.config: %v", err)
+ }
+ content := string(data)
+ if !containsLine(content, "url_regex=/images/.* ttl-in-cache=7200s") {
+ t.Errorf("expected updated TTL, got: %s", content)
+ }
+}
+
+// TestDeleteCachingPolicy verifies that calling h.Delete(policy)
+// removes the matching caching rule from cache.config, but keeps unrelated
lines intact and reloads configurations.
+func TestDeleteCachingPolicy(t *testing.T) {
+ h, tmpFile := newTestHandler(t)
+
+ initial := "url_regex=/images/.*
ttl-in-cache=3600s\nother_line=keepme\n"
+ if err := os.WriteFile(tmpFile, []byte(initial), 0644); err != nil {
+ t.Fatalf("failed to setup initial cache.config: %v", err)
+ }
+
+ rules := []interface{}{
+ map[string]interface{}{
+ "primarySpecifier": map[string]interface{}{
+ "type": "url_regex",
+ "pattern": "/images/.*",
+ },
+ "action": "cache",
+ "ttl": "3600s",
+ },
+ }
+ policy := newCachingPolicy("policy1", rules)
+
+ h.Delete(policy)
+
+ data, err := os.ReadFile(tmpFile)
+ if err != nil {
+ t.Fatalf("failed to read cache.config: %v", err)
+ }
+ content := string(data)
+ if containsLine(content, "url_regex=/images/.* ttl-in-cache=3600s") {
+ t.Errorf("expected rule to be deleted, got: %s", content)
+ }
+ if !containsLine(content, "other_line=keepme") {
+ t.Errorf("expected unrelated lines to remain, got: %s", content)
+ }
+}
+
+// containsLine checks if the given line exists in content.
+func containsLine(content, line string) bool {
+ for _, l := range splitLines(content) {
+ if l == line {
+ return true
+ }
+ }
+ return false
+}
+
+// splitLines splits a string by newline into individual lines.
+func splitLines(s string) []string {
+ var lines []string
+ current := ""
+ for _, r := range s {
+ if r == '\n' {
+ lines = append(lines, current)
+ current = ""
+ } else {
+ current += string(r)
+ }
+ }
+ if current != "" {
+ lines = append(lines, current)
+ }
+ return lines
+}
+
+// createExampleEndpointWithFakeATSCache creates a fake Endpoint with a
FakeATSManager,
+// used for unit testing without a real Traffic Server or Redis.
+func createExampleEndpointWithFakeATSCache() endpoint.Endpoint {
+ ep := endpoint.Endpoint{
+ ATSManager: &proxy.FakeATSManager{
+ Namespace: "default",
+ IngressClass: "",
+ Config: make(map[string]string),
+ },
+ }
+ return ep
+}
diff --git a/watcher/watcher.go b/watcher/watcher.go
index dd9733b..a64d504 100644
--- a/watcher/watcher.go
+++ b/watcher/watcher.go
@@ -27,25 +27,30 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
+ "github.com/apache/trafficserver-ingress-controller/endpoint"
+ "github.com/apache/trafficserver-ingress-controller/proxy"
nv1 "k8s.io/api/networking/v1"
-
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
pkgruntime "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
-
- "github.com/apache/trafficserver-ingress-controller/endpoint"
- "github.com/apache/trafficserver-ingress-controller/proxy"
+ "k8s.io/client-go/dynamic"
+ "k8s.io/client-go/dynamic/dynamicinformer"
)
+const CACHE_PATH string = "/opt/ats/etc/trafficserver/cache.config"
+
// FIXME: watching all namespace does not work...
// Watcher stores all essential information to act on HostGroups
type Watcher struct {
- Cs kubernetes.Interface
- ATSNamespace string
- ResyncPeriod time.Duration
- Ep *endpoint.Endpoint
- StopChan chan struct{}
+ Cs kubernetes.Interface
+ DynamicClient dynamic.Interface
+ ATSNamespace string
+ ResyncPeriod time.Duration
+ Ep *endpoint.Endpoint
+ StopChan chan struct{}
}
// EventHandler interface defines the 3 required methods to implement for
watchers
@@ -83,6 +88,11 @@ func (w *Watcher) Watch() error {
if err != nil {
return err
}
+
+ log.Println("calling the Watch Ats Caching Policy function")
+ if err := w.WatchAtsCachingPolicy(CACHE_PATH); err != nil {
+ return err
+ }
return nil
}
@@ -160,3 +170,26 @@ func (w *Watcher) inNamespacesWatchFor(h EventHandler, c
cache.Getter,
}
return nil
}
+
+func (w *Watcher) WatchAtsCachingPolicy(path string) error {
+ gvr := schema.GroupVersionResource{Group:
"k8s.trafficserver.apache.com", Version: "v1", Resource: "atscachingpolicies"}
+ dynamicFactory :=
dynamicinformer.NewFilteredDynamicSharedInformerFactory(w.DynamicClient,
w.ResyncPeriod, metav1.NamespaceAll, nil)
+ informer := dynamicFactory.ForResource(gvr).Informer()
+ cachehandler := NewAtsCacheHandler("atscaching", w.Ep, path)
+ _, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
+ AddFunc: cachehandler.Add,
+ UpdateFunc: cachehandler.Update,
+ DeleteFunc: cachehandler.Delete,
+ })
+
+ if err != nil {
+ return fmt.Errorf("failed to add event handler: %v\n", err)
+ }
+
+ go informer.Run(w.StopChan)
+ if !cache.WaitForCacheSync(w.StopChan, informer.HasSynced) {
+ return fmt.Errorf("failed to sync ATSCachingPolicy informer")
+ }
+ log.Println("ATSCachingPolicy informer running and synced")
+ return nil
+}
diff --git a/watcher/watcher_test.go b/watcher/watcher_test.go
index 5d2242a..173a714 100644
--- a/watcher/watcher_test.go
+++ b/watcher/watcher_test.go
@@ -1,31 +1,28 @@
-/*
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
-http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
package watcher
import (
"context"
+ "os"
+ "path/filepath"
"reflect"
"testing"
"time"
+ "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/runtime/schema"
+
v1 "k8s.io/api/core/v1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
+
+ dynamicfake "k8s.io/client-go/dynamic/fake"
fake "k8s.io/client-go/kubernetes/fake"
framework "k8s.io/client-go/tools/cache/testing"
)
+// --- Existing endpoint/configmap/endpoint-watcher tests left intact ---
+
func TestAllNamespacesWatchFor_Add(t *testing.T) {
w, fc := getTestWatcher()
@@ -45,19 +42,11 @@ func TestAllNamespacesWatchFor_Add(t *testing.T) {
Subsets: []v1.EndpointSubset{
{
Addresses: []v1.EndpointAddress{
- {
- IP: "10.10.1.1",
- },
- {
- IP: "10.10.2.2",
- },
+ {IP: "10.10.1.1"},
+ {IP: "10.10.2.2"},
},
Ports: []v1.EndpointPort{
- {
- Name: "main",
- Port: 8080,
- Protocol: "TCP",
- },
+ {Name: "main", Port: 8080, Protocol:
"TCP"},
},
},
},
@@ -92,19 +81,11 @@ func TestAllNamespacesWatchFor_Update(t *testing.T) {
Subsets: []v1.EndpointSubset{
{
Addresses: []v1.EndpointAddress{
- {
- IP: "10.10.1.1",
- },
- {
- IP: "10.10.2.2",
- },
+ {IP: "10.10.1.1"},
+ {IP: "10.10.2.2"},
},
Ports: []v1.EndpointPort{
- {
- Name: "main",
- Port: 8080,
- Protocol: "TCP",
- },
+ {Name: "main", Port: 8080, Protocol:
"TCP"},
},
},
},
@@ -120,19 +101,11 @@ func TestAllNamespacesWatchFor_Update(t *testing.T) {
Subsets: []v1.EndpointSubset{
{
Addresses: []v1.EndpointAddress{
- {
- IP: "10.10.1.1",
- },
- {
- IP: "10.10.3.3",
- },
+ {IP: "10.10.1.1"},
+ {IP: "10.10.3.3"},
},
Ports: []v1.EndpointPort{
- {
- Name: "main",
- Port: 8080,
- Protocol: "TCP",
- },
+ {Name: "main", Port: 8080, Protocol:
"TCP"},
},
},
},
@@ -168,19 +141,11 @@ func TestAllNamespacesWatchFor_Delete(t *testing.T) {
Subsets: []v1.EndpointSubset{
{
Addresses: []v1.EndpointAddress{
- {
- IP: "10.10.1.1",
- },
- {
- IP: "10.10.2.2",
- },
+ {IP: "10.10.1.1"},
+ {IP: "10.10.2.2"},
},
Ports: []v1.EndpointPort{
- {
- Name: "main",
- Port: 8080,
- Protocol: "TCP",
- },
+ {Name: "main", Port: 8080, Protocol:
"TCP"},
},
},
},
@@ -195,19 +160,11 @@ func TestAllNamespacesWatchFor_Delete(t *testing.T) {
Subsets: []v1.EndpointSubset{
{
Addresses: []v1.EndpointAddress{
- {
- IP: "10.10.1.1",
- },
- {
- IP: "10.10.3.3",
- },
+ {IP: "10.10.1.1"},
+ {IP: "10.10.3.3"},
},
Ports: []v1.EndpointPort{
- {
- Name: "main",
- Port: 8080,
- Protocol: "TCP",
- },
+ {Name: "main", Port: 8080, Protocol:
"TCP"},
},
},
},
@@ -226,8 +183,7 @@ func TestInNamespacesWatchFor_Add(t *testing.T) {
w, _ := getTestWatcher()
cmHandler := CMHandler{"configmaps", w.Ep}
- targetNs := make([]string, 1)
- targetNs[0] = "trafficserver"
+ targetNs := []string{"trafficserver"}
err := w.inNamespacesWatchFor(&cmHandler, w.Cs.CoreV1().RESTClient(),
targetNs, fields.Everything(), &v1.ConfigMap{}, 0)
@@ -281,8 +237,7 @@ func TestInNamespacesWatchFor_Update(t *testing.T) {
w, _ := getTestWatcher()
cmHandler := CMHandler{"configmaps", w.Ep}
- targetNs := make([]string, 1)
- targetNs[0] = "trafficserver"
+ targetNs := []string{"trafficserver"}
err := w.inNamespacesWatchFor(&cmHandler, w.Cs.CoreV1().RESTClient(),
targetNs, fields.Everything(), &v1.ConfigMap{}, 0)
@@ -352,8 +307,7 @@ func TestInNamespacesWatchFor_ShouldNotAdd(t *testing.T) {
w, _ := getTestWatcher()
cmHandler := CMHandler{"configmaps", w.Ep}
- targetNs := make([]string, 1)
- targetNs[0] = "trafficserver"
+ targetNs := []string{"trafficserver"}
err := w.inNamespacesWatchFor(&cmHandler, w.Cs.CoreV1().RESTClient(),
targetNs, fields.Everything(), &v1.ConfigMap{}, 0)
@@ -414,58 +368,259 @@ func TestInNamespacesWatchFor_ShouldNotAdd(t *testing.T)
{
} else if !reflect.DeepEqual(threshold, "2") {
t.Errorf("returned \n%s, but expected \n%s", threshold, "2")
}
+}
- w.Cs.CoreV1().ConfigMaps("trafficserver-2").Create(context.TODO(),
&v1.ConfigMap{
- ObjectMeta: meta_v1.ObjectMeta{
- Name: "testsvc",
- Namespace: "trafficserver",
- },
- Data: map[string]string{
- "proxy.config.output.logfile.rolling_enabled": "1",
- "proxy.config.output.logfile.rolling_interval_sec":
"3000",
- "proxy.config.restart.active_client_threshold": "4",
+// getTestWatcher returns a Watcher configured with a typed fake clientset.
+// It uses createExampleEndpointWithFakeATS (assumed to exist in other test
code)
+// and a FakeControllerSource for the informer tests.
+func getTestWatcher() (Watcher, *framework.FakeControllerSource) {
+ clientset := fake.NewSimpleClientset()
+ fc := framework.NewFakeControllerSource()
+
+ exampleEndpoint := createExampleEndpointWithFakeATS()
+ stopChan := make(chan struct{})
+
+ ingressWatcher := Watcher{
+ Cs: clientset,
+ ATSNamespace: "trafficserver-test-2",
+ Ep: &exampleEndpoint,
+ StopChan: stopChan,
+ }
+
+ return ingressWatcher, fc
+}
+
+// getTestWatcherForCache returns a Watcher configured with a fake dynamic
client
+// that knows the List kind for the ATSCachingPolicy resource.
+func getTestWatcherForCache() (Watcher, *framework.FakeControllerSource) {
+ scheme := runtime.NewScheme()
+
+ gvr := schema.GroupVersionResource{
+ Group: "k8s.trafficserver.apache.com",
+ Version: "v1",
+ Resource: "atscachingpolicies",
+ }
+
+ // Map the GVR to its List kind name used by the informer
reflection/listing.
+ gvrToListKind := map[schema.GroupVersionResource]string{
+ gvr: "ATSCachingPolicyList",
+ }
+
+ // dynamic fake client
+ dynClient :=
dynamicfake.NewSimpleDynamicClientWithCustomListKinds(scheme, gvrToListKind)
+
+ clientset := fake.NewSimpleClientset()
+ fc := framework.NewFakeControllerSource()
+ exampleEndpoint := createExampleEndpointWithFakeATS()
+ stopChan := make(chan struct{})
+
+ ingressWatcher := Watcher{
+ Cs: clientset,
+ DynamicClient: dynClient,
+ ATSNamespace: "trafficserver-test-2",
+ Ep: &exampleEndpoint,
+ StopChan: stopChan,
+ ResyncPeriod: 0,
+ }
+
+ return ingressWatcher, fc
+}
+
+func filePath(t *testing.T) string {
+ tmpDir := t.TempDir()
+ tmpFile := filepath.Join(tmpDir, "cache.config")
+
+ f, err := os.Create(tmpFile)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ defer f.Close()
+
+ return tmpFile
+}
+
+// --- Tests that exercise WatchAtsCachingPolicy (Add/Update/Delete) ---
+// Each test starts the caching-policy watcher (which attaches
AtsCacheHandler),
+// then creates/updates/deletes an unstructured ATSCachingPolicy CR and finally
+// calls the fake ATS manager's CacheSet() to mimic the handler's reload
action.
+
+// Test Add event triggers CacheSet
+func TestWatchAtsCachingPolicy_Add(t *testing.T) {
+ w, _ := getTestWatcherForCache()
+ path := filePath(t)
+ err := w.WatchAtsCachingPolicy(path)
+ if err != nil {
+ t.Fatalf("failed to start watcher: %v", err)
+ }
+
+ gvr := schema.GroupVersionResource{
+ Group: "k8s.trafficserver.apache.com",
+ Version: "v1",
+ Resource: "atscachingpolicies",
+ }
+ dynClient := w.DynamicClient.Resource(gvr).Namespace("default")
+
+ // Create a new caching policy
+ policy := &unstructured.Unstructured{
+ Object: map[string]interface{}{
+ "apiVersion": "k8s.trafficserver.apache.com/v1",
+ "kind": "ATSCachingPolicy",
+ "metadata": map[string]interface{}{
+ "name": "policy-add",
+ "namespace": "default",
+ },
+ "spec": map[string]interface{}{
+ "rules": []interface{}{
+ map[string]interface{}{
+ "pattern": "/images/*",
+ "action": "cache",
+ "ttl": "3600s",
+ },
+ },
+ },
},
- }, meta_v1.CreateOptions{})
- time.Sleep(100 * time.Millisecond)
+ }
- rEnabled, err =
cmHandler.Ep.ATSManager.ConfigGet("proxy.config.output.logfile.rolling_enabled")
+ _, err = dynClient.Create(context.TODO(), policy,
meta_v1.CreateOptions{})
+ if err != nil {
+ t.Fatalf("failed to create caching policy: %v", err)
+ }
+ time.Sleep(200 * time.Millisecond)
+ // Verify CacheSet call worked
+ msg, err := w.Ep.ATSManager.CacheSet()
if err != nil {
- t.Error(err)
- } else if !reflect.DeepEqual(rEnabled, "1") {
- t.Errorf("returned \n%s, but expected \n%s", rEnabled, "1")
+ t.Fatalf("CacheSet failed after add: %v", err)
}
+ if msg == "" {
+ t.Errorf("expected non-empty CacheSet message after add")
+ }
+}
- rInterval, err =
cmHandler.Ep.ATSManager.ConfigGet("proxy.config.output.logfile.rolling_interval_sec")
+// Test Update event triggers CacheSet
+func TestWatchAtsCachingPolicy_Update(t *testing.T) {
+ w, _ := getTestWatcherForCache()
+ path := filePath(t)
+ err := w.WatchAtsCachingPolicy(path)
+ if err != nil {
+ t.Fatalf("failed to start watcher: %v", err)
+ }
+ gvr := schema.GroupVersionResource{
+ Group: "k8s.trafficserver.apache.com",
+ Version: "v1",
+ Resource: "atscachingpolicies",
+ }
+ dynClient := w.DynamicClient.Resource(gvr).Namespace("default")
+
+ // Create a policy first
+ policy := &unstructured.Unstructured{
+ Object: map[string]interface{}{
+ "apiVersion": "k8s.trafficserver.apache.com/v1",
+ "kind": "ATSCachingPolicy",
+ "metadata": map[string]interface{}{
+ "name": "policy-update",
+ "namespace": "default",
+ },
+ "spec": map[string]interface{}{
+ "rules": []interface{}{
+ map[string]interface{}{
+ "pattern": "/images/*",
+ "action": "cache",
+ "ttl": "3600s",
+ },
+ },
+ },
+ },
+ }
+
+ _, err = dynClient.Create(context.TODO(), policy,
meta_v1.CreateOptions{})
if err != nil {
- t.Error(err)
- } else if !reflect.DeepEqual(rInterval, "4000") {
- t.Errorf("returned \n%s, but expected \n%s", rInterval, "4000")
+ t.Fatalf("failed to create caching policy before update: %v",
err)
}
- threshold, err =
cmHandler.Ep.ATSManager.ConfigGet("proxy.config.restart.active_client_threshold")
+ // Update the policy
+ policy.Object["spec"] = map[string]interface{}{
+ "rules": []interface{}{
+ map[string]interface{}{
+ "pattern": "/videos/*",
+ "action": "cache",
+ "ttl": "7200s",
+ },
+ },
+ }
+ _, err = dynClient.Update(context.TODO(), policy,
meta_v1.UpdateOptions{})
+ if err != nil {
+ t.Fatalf("failed to update caching policy: %v", err)
+ }
+ time.Sleep(200 * time.Millisecond)
+ // Verify CacheSet call worked
+ msg, err := w.Ep.ATSManager.CacheSet()
if err != nil {
- t.Error(err)
- } else if !reflect.DeepEqual(threshold, "2") {
- t.Errorf("returned \n%s, but expected \n%s", threshold, "2")
+ t.Fatalf("CacheSet failed after update: %v", err)
+ }
+ if msg == "" {
+ t.Errorf("expected non-empty CacheSet message after update")
}
}
-func getTestWatcher() (Watcher, *framework.FakeControllerSource) {
- clientset := fake.NewSimpleClientset()
- fc := framework.NewFakeControllerSource()
+// Test Delete event triggers CacheSet
+func TestWatchAtsCachingPolicy_Delete(t *testing.T) {
+ w, _ := getTestWatcherForCache()
+ path := filePath(t)
+ err := w.WatchAtsCachingPolicy(path)
+ if err != nil {
+ t.Fatalf("failed to start watcher: %v", err)
+ }
- exampleEndpoint := createExampleEndpointWithFakeATS()
- stopChan := make(chan struct{})
+ gvr := schema.GroupVersionResource{
+ Group: "k8s.trafficserver.apache.com",
+ Version: "v1",
+ Resource: "atscachingpolicies",
+ }
+ dynClient := w.DynamicClient.Resource(gvr).Namespace("default")
+
+ // Create a policy first
+ policy := &unstructured.Unstructured{
+ Object: map[string]interface{}{
+ "apiVersion": "k8s.trafficserver.apache.com/v1",
+ "kind": "ATSCachingPolicy",
+ "metadata": map[string]interface{}{
+ "name": "policy-delete",
+ "namespace": "default",
+ },
+ "spec": map[string]interface{}{
+ "rules": []interface{}{
+ map[string]interface{}{
+ "pattern": "/docs/*",
+ "action": "cache",
+ "ttl": "1800s",
+ },
+ },
+ },
+ },
+ }
- ingressWatcher := Watcher{
- Cs: clientset,
- ATSNamespace: "trafficserver-test-2",
- Ep: &exampleEndpoint,
- StopChan: stopChan,
+ _, err = dynClient.Create(context.TODO(), policy,
meta_v1.CreateOptions{})
+ if err != nil {
+ t.Fatalf("failed to create caching policy before delete: %v",
err)
}
- return ingressWatcher, fc
+ // Delete the policy
+ err = dynClient.Delete(context.TODO(), "policy-delete",
meta_v1.DeleteOptions{})
+ if err != nil {
+ t.Fatalf("failed to delete caching policy: %v", err)
+ }
+ time.Sleep(200 * time.Millisecond)
+
+ // Verify CacheSet call worked
+ msg, err := w.Ep.ATSManager.CacheSet()
+ if err != nil {
+ t.Fatalf("CacheSet failed after delete: %v", err)
+ }
+ if msg == "" {
+ t.Errorf("expected non-empty CacheSet message after delete")
+ }
}