This is an automated email from the ASF dual-hosted git repository. lburgazzoli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-k.git
The following commit(s) were added to refs/heads/master by this push: new 44f1e40 Pod log implementation 44f1e40 is described below commit 44f1e40dcf038ada075f57ef073c3f113d7adbcb Author: nferraro <ni.ferr...@gmail.com> AuthorDate: Tue Sep 18 01:32:08 2018 +0200 Pod log implementation --- Gopkg.lock | 1 + pkg/client/cmd/run.go | 25 +++++ pkg/util/log/annotation_scraper.go | 166 +++++++++++++++++++++++++++++++ pkg/util/log/pod_scraper.go | 175 +++++++++++++++++++++++++++++++++ test/build_manager_integration_test.go | 8 +- test/local_builder_integration_test.go | 14 +-- test/log_scrape_integration_test.go | 106 ++++++++++++++++++++ test/testing_env.go | 118 +++++++++++++++++++++- 8 files changed, 599 insertions(+), 14 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index bc54a02..823b59c 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -736,6 +736,7 @@ "k8s.io/apimachinery/pkg/runtime/serializer/json", "k8s.io/apimachinery/pkg/runtime/serializer/versioning", "k8s.io/apimachinery/pkg/util/yaml", + "k8s.io/apimachinery/pkg/watch", "k8s.io/client-go/plugin/pkg/client/auth/gcp", "k8s.io/client-go/rest", "k8s.io/client-go/tools/clientcmd", diff --git a/pkg/client/cmd/run.go b/pkg/client/cmd/run.go index c4df815..171ed1d 100644 --- a/pkg/client/cmd/run.go +++ b/pkg/client/cmd/run.go @@ -32,6 +32,8 @@ import ( "github.com/spf13/cobra" k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/apis/meta/v1" + "github.com/apache/camel-k/pkg/util/log" + "io" ) // NewCmdRun -- @@ -56,6 +58,7 @@ func NewCmdRun(rootCmdOptions *RootCmdOptions) *cobra.Command { cmd.Flags().StringSliceVarP(&options.Properties, "property", "p", nil, "Add a camel property") cmd.Flags().StringSliceVar(&options.ConfigMaps, "configmap", nil, "Add a ConfigMap") cmd.Flags().StringSliceVar(&options.Secrets, "secret", nil, "Add a Secret") + cmd.Flags().BoolVar(&options.Logs, "logs", false, "Print integration logs") return &cmd } @@ -70,6 +73,7 @@ type runCmdOptions struct { ConfigMaps []string Secrets []string Wait bool + Logs bool } func (*runCmdOptions) validateArgs(cmd *cobra.Command, args []string) error { @@ -96,6 +100,12 @@ func (o *runCmdOptions) run(cmd *cobra.Command, args []string) error { return err } } + if o.Logs { + err = o.printLogs(integration) + if err != nil { + return err + } + } return nil } @@ -137,6 +147,21 @@ watcher: return nil } +func (o *runCmdOptions) printLogs(integration *v1alpha1.Integration) error { + scraper := log.NewSelectorScraper(integration.Namespace, "camel.apache.org/integration=" + integration.Name) + reader := scraper.Start(o.Context) + for { + str, err := reader.ReadString('\n') + if err == io.EOF || o.Context.Err() != nil { + break + } else if err != nil { + return err + } + fmt.Print(str) + } + return nil +} + func (o *runCmdOptions) createIntegration(cmd *cobra.Command, args []string) (*v1alpha1.Integration, error) { code, err := o.loadCode(args[0]) if err != nil { diff --git a/pkg/util/log/annotation_scraper.go b/pkg/util/log/annotation_scraper.go new file mode 100644 index 0000000..2e47b5e --- /dev/null +++ b/pkg/util/log/annotation_scraper.go @@ -0,0 +1,166 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package log + +import ( + "bufio" + "io" + "context" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "github.com/operator-framework/operator-sdk/pkg/sdk" + "k8s.io/api/core/v1" + "sync" + "sync/atomic" + "strconv" + "github.com/sirupsen/logrus" + "time" +) + +// SelectorScraper scrapes all pods with a given selector +type SelectorScraper struct { + namespace string + labelSelector string + podScrapers sync.Map + counter uint64 +} + +// NewSelectorScraper creates a new SelectorScraper +func NewSelectorScraper(namespace string, labelSelector string) *SelectorScraper { + return &SelectorScraper{ + namespace: namespace, + labelSelector: labelSelector, + } +} + +// Start returns a reader that streams the log of all selected pods +func (s *SelectorScraper) Start(ctx context.Context) *bufio.Reader { + pipeIn, pipeOut := io.Pipe() + bufPipeIn := bufio.NewReader(pipeIn) + bufPipeOut := bufio.NewWriter(pipeOut) + closeFun := func() error { + bufPipeOut.Flush() + return pipeOut.Close() + } + go s.periodicSynchronize(ctx, bufPipeOut, closeFun) + return bufPipeIn +} + +func (s *SelectorScraper) periodicSynchronize(ctx context.Context, out *bufio.Writer, clientCloser func() error) { + err := s.synchronize(ctx, out, clientCloser) + if err != nil { + logrus.Warn("Could not synchronize log by label " + s.labelSelector) + } + select { + case <- ctx.Done(): + // cleanup + s.podScrapers.Range(func(k, v interface{}) bool { + if canc, isCanc := v.(context.CancelFunc); isCanc { + canc() + } + + return true + }) + clientCloser() + case <- time.After(2*time.Second): + go s.periodicSynchronize(ctx, out, clientCloser) + } +} + +func (s *SelectorScraper) synchronize(ctx context.Context, out *bufio.Writer, clientCloser func() error) error { + list, err := s.listPods() + if err != nil { + return err + } + + present := make(map[string]bool) + for _, pod := range list.Items { + present[pod.Name] = true + if _, ok := s.podScrapers.Load(pod.Name); !ok { + s.addPodScraper(ctx, pod.Name, out) + } + } + + toBeRemoved := make(map[string]bool) + s.podScrapers.Range(func(k, v interface{}) bool { + if str, isStr := k.(string); isStr { + if _, ok := present[str]; !ok { + toBeRemoved[str] = true + } + } + + return true + }) + + for podName := range toBeRemoved { + if scr, ok := s.podScrapers.Load(podName); ok { + if canc, ok2 := scr.(context.CancelFunc); ok2 { + canc() + s.podScrapers.Delete(podName) + } + } + } + return nil +} + +func (s *SelectorScraper) addPodScraper(ctx context.Context, name string, out *bufio.Writer) { + podScraper := NewPodScraper(s.namespace, name) + podCtx, podCancel := context.WithCancel(ctx) + id := atomic.AddUint64(&s.counter, 1) + prefix := "[" + strconv.FormatUint(id, 10) + "] " + podReader := podScraper.Start(podCtx) + s.podScrapers.Store(name, podCancel) + go func() { + defer podCancel() + + out.WriteString(prefix + "Monitoring pod " + name) + for { + str, err := podReader.ReadString('\n') + if err == io.EOF { + return + } else if err != nil { + logrus.Error("Cannot read from pod stream: ", err) + return + } + out.WriteString(prefix + str) + out.Flush() + if podCtx.Err() != nil { + return + } + } + }() + +} + +func (s *SelectorScraper) listPods() (*v1.PodList, error) { + list := v1.PodList{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: v1.SchemeGroupVersion.String(), + }, + } + + err := sdk.List(s.namespace, &list, sdk.WithListOptions(&metav1.ListOptions{ + LabelSelector: s.labelSelector, + })) + + if err != nil { + return nil, err + } + + return &list, nil +} diff --git a/pkg/util/log/pod_scraper.go b/pkg/util/log/pod_scraper.go new file mode 100644 index 0000000..798f5b7 --- /dev/null +++ b/pkg/util/log/pod_scraper.go @@ -0,0 +1,175 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package log + +import ( + "bufio" + "context" + "github.com/operator-framework/operator-sdk/pkg/k8sclient" + "github.com/operator-framework/operator-sdk/pkg/util/k8sutil" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "io" + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "time" +) + +// PodScraper scrapes logs of a specific pod +type PodScraper struct { + namespace string + name string +} + +// NewPodScraper creates a new pod scraper +func NewPodScraper(namespace string, name string) *PodScraper { + return &PodScraper{ + namespace: namespace, + name: name, + } +} + +// Start returns a reader that streams the pod logs +func (s *PodScraper) Start(ctx context.Context) *bufio.Reader { + pipeIn, pipeOut := io.Pipe() + bufPipeIn := bufio.NewReader(pipeIn) + bufPipeOut := bufio.NewWriter(pipeOut) + closeFun := func() error { + bufPipeOut.Flush() + return pipeOut.Close() + } + go s.doScrape(ctx, bufPipeOut, closeFun) + return bufPipeIn +} + +func (s *PodScraper) doScrape(ctx context.Context, out *bufio.Writer, clientCloser func() error) { + err := s.waitForPodRunning(ctx, s.namespace, s.name) + if err != nil { + s.handleAndRestart(ctx, err, 5*time.Second, out, clientCloser) + return + } + + byteReader, err := k8sclient.GetKubeClient().CoreV1().Pods(s.namespace).GetLogs(s.name, &v1.PodLogOptions{Follow: true}).Context(ctx).Stream() + if err != nil { + s.handleAndRestart(ctx, err, 5*time.Second, out, clientCloser) + return + } + + reader := bufio.NewReader(byteReader) + err = nil + for err == nil { + str, err := reader.ReadString('\n') + if err != nil { + break + } + _, err = out.WriteString(str) + if err != nil { + break + } + out.Flush() + } + if err == io.EOF { + return + } + + s.handleAndRestart(ctx, err, 5*time.Second, out, clientCloser) +} + +func (s *PodScraper) handleAndRestart(ctx context.Context, err error, wait time.Duration, out *bufio.Writer, clientCloser func() error) { + if err != nil { + logrus.Warn(errors.Wrap(err, "error caught during log scraping for pod "+s.name)) + } + + if ctx.Err() != nil { + logrus.Info("Pod ", s.name, " will no longer be monitored") + clientCloser() + return + } + + logrus.Info("Retrying to scrape pod ", s.name, " logs in ", wait.Seconds(), " seconds...") + select { + case <-time.After(wait): + break + case <-ctx.Done(): + clientCloser() + return + } + + s.doScrape(ctx, out, clientCloser) +} + +// Waits for a given pod to reach the running state +func (s *PodScraper) waitForPodRunning(ctx context.Context, namespace string, name string) error { + pod := v1.Pod{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: v1.SchemeGroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + } + resourceClient, _, err := k8sclient.GetResourceClient(pod.APIVersion, pod.Kind, pod.Namespace) + if err != nil { + return err + } + watcher, err := resourceClient.Watch(metav1.ListOptions{ + FieldSelector: "metadata.name=" + pod.Name, + }) + if err != nil { + return err + } + events := watcher.ResultChan() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case e, ok := <-events: + if !ok { + return errors.New("event channel closed") + } + + if e.Object != nil { + if runtimeUnstructured, ok := e.Object.(runtime.Unstructured); ok { + unstr := unstructured.Unstructured{ + Object: runtimeUnstructured.UnstructuredContent(), + } + pcopy := pod.DeepCopy() + err := k8sutil.UnstructuredIntoRuntimeObject(&unstr, pcopy) + if err != nil { + return err + } + + if pcopy.Status.Phase == v1.PodRunning { + return nil + } + } + } else if e.Type == watch.Deleted || e.Type == watch.Error { + return errors.New("unable to watch pod " + s.name) + } + case <-time.After(30 * time.Second): + return errors.New("no state change after 30 seconds for pod " + s.name) + } + } + + return nil +} diff --git a/test/build_manager_integration_test.go b/test/build_manager_integration_test.go index d031fb4..a352009 100644 --- a/test/build_manager_integration_test.go +++ b/test/build_manager_integration_test.go @@ -34,7 +34,7 @@ import ( func TestBuildManagerBuild(t *testing.T) { ctx := context.TODO() - buildManager := build.NewManager(ctx, GetTargetNamespace()) + buildManager := build.NewManager(ctx, getTargetNamespace()) identifier := buildapi.BuildIdentifier{ Name: "man-test", Qualifier: digest.Random(), @@ -42,7 +42,7 @@ func TestBuildManagerBuild(t *testing.T) { buildManager.Start(buildapi.BuildSource{ Identifier: identifier, Code: buildapi.Code{ - Content: TimerToLogIntegrationCode(), + Content: createTimerToLogIntegrationCode(), }, Dependencies: []string{ "mvn:org.apache.camel/camel-core", @@ -68,7 +68,7 @@ func TestBuildManagerBuild(t *testing.T) { func TestBuildManagerFailedBuild(t *testing.T) { ctx := context.TODO() - buildManager := build.NewManager(ctx, GetTargetNamespace()) + buildManager := build.NewManager(ctx, getTargetNamespace()) identifier := buildapi.BuildIdentifier{ Name: "man-test-2", Qualifier: digest.Random(), @@ -76,7 +76,7 @@ func TestBuildManagerFailedBuild(t *testing.T) { buildManager.Start(buildapi.BuildSource{ Identifier: identifier, Code: buildapi.Code{ - Content: TimerToLogIntegrationCode(), + Content: createTimerToLogIntegrationCode(), }, Dependencies: []string{ "mvn:org.apache.camel/camel-cippalippa", diff --git a/test/local_builder_integration_test.go b/test/local_builder_integration_test.go index 79cedeb..61983ce 100644 --- a/test/local_builder_integration_test.go +++ b/test/local_builder_integration_test.go @@ -34,7 +34,7 @@ import ( func TestLocalBuild(t *testing.T) { ctx := context.TODO() - builder := local.NewLocalBuilder(ctx, GetTargetNamespace()) + builder := local.NewLocalBuilder(ctx, getTargetNamespace()) execution := builder.Build(build.BuildSource{ Identifier: build.BuildIdentifier{ @@ -42,7 +42,7 @@ func TestLocalBuild(t *testing.T) { Qualifier: digest.Random(), }, Code: build.Code{ - Content: TimerToLogIntegrationCode(), + Content: createTimerToLogIntegrationCode(), }, }) @@ -54,7 +54,7 @@ func TestLocalBuild(t *testing.T) { func TestLocalDoubleBuild(t *testing.T) { ctx := context.TODO() - builder := local.NewLocalBuilder(ctx, GetTargetNamespace()) + builder := local.NewLocalBuilder(ctx, getTargetNamespace()) execution1 := builder.Build(build.BuildSource{ Identifier: build.BuildIdentifier{ @@ -62,7 +62,7 @@ func TestLocalDoubleBuild(t *testing.T) { Qualifier: digest.Random(), }, Code: build.Code{ - Content: TimerToLogIntegrationCode(), + Content: createTimerToLogIntegrationCode(), }, }) @@ -72,7 +72,7 @@ func TestLocalDoubleBuild(t *testing.T) { Qualifier: digest.Random(), }, Code: build.Code{ - Content: TimerToLogIntegrationCode(), + Content: createTimerToLogIntegrationCode(), }, }) @@ -86,7 +86,7 @@ func TestLocalDoubleBuild(t *testing.T) { func TestLocalFailedBuild(t *testing.T) { ctx := context.TODO() - builder := local.NewLocalBuilder(ctx, GetTargetNamespace()) + builder := local.NewLocalBuilder(ctx, getTargetNamespace()) execution := builder.Build(build.BuildSource{ Identifier: build.BuildIdentifier{ @@ -94,7 +94,7 @@ func TestLocalFailedBuild(t *testing.T) { Qualifier: digest.Random(), }, Code: build.Code{ - Content: TimerToLogIntegrationCode(), + Content: createTimerToLogIntegrationCode(), }, Dependencies: []string{ "camel:cippalippa", diff --git a/test/log_scrape_integration_test.go b/test/log_scrape_integration_test.go new file mode 100644 index 0000000..2052cc5 --- /dev/null +++ b/test/log_scrape_integration_test.go @@ -0,0 +1,106 @@ +// +build integration + +// To enable compilation of this file in Goland, go to "Settings -> Go -> Vendoring & Build Tags -> Custom Tags" and add "integration" + +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package test + +import ( + "testing" + "github.com/apache/camel-k/pkg/util/log" + "context" + "time" + "github.com/stretchr/testify/assert" + "strings" + "github.com/operator-framework/operator-sdk/pkg/sdk" +) + +func TestPodLogScrape(t *testing.T) { + token := "Hello Camel K!" + pod, err := createDummyPod("scraped", "/bin/sh", "-c", "for i in `seq 1 50`; do echo \""+token+"\" && sleep 2; done") + defer sdk.Delete(pod) + assert.Nil(t, err) + + ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(30*time.Second)) + scraper := log.NewPodScraper(pod.Namespace, pod.Name) + in := scraper.Start(ctx) + + res := make(chan bool) + go func() { + for { + if dl, _ := ctx.Deadline(); time.Now().After(dl) { + return + } + + str, _ := in.ReadString('\n') + if strings.Contains(str, token) { + res <- true + return + } + } + }() + + select { + case <-res: + break + case <-time.After(30 * time.Second): + assert.Fail(t, "timeout while waiting from token") + } +} + +func TestSelectorLogScrape(t *testing.T) { + token := "Hello Camel K!" + replicas := int32(3) + deployment, err := createDummyDeployment("scraped-deployment", &replicas, "scrape", "me", "/bin/sh", "-c", "for i in `seq 1 50`; do echo \""+token+"\" && sleep 2; done") + defer sdk.Delete(deployment) + assert.Nil(t, err) + + ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(30*time.Second)) + scraper := log.NewSelectorScraper(deployment.Namespace, "scrape=me") + in := scraper.Start(ctx) + + res := make(chan string) + go func() { + for { + if dl, _ := ctx.Deadline(); time.Now().After(dl) { + return + } + + str, _ := in.ReadString('\n') + if strings.Contains(str, token) { + res <- str[0:3] + } + } + }() + + recv := make(map[string]bool) +loop: + for { + select { + case r := <-res: + recv[r] = true + if len(recv) == 3 { + break loop + } + case <-time.After(13 * time.Second): + assert.Fail(t, "timeout while waiting from token") + break loop + } + } +} diff --git a/test/testing_env.go b/test/testing_env.go index 3b1e215..10fc8e5 100644 --- a/test/testing_env.go +++ b/test/testing_env.go @@ -24,6 +24,12 @@ package test import ( "github.com/apache/camel-k/pkg/install" "github.com/apache/camel-k/pkg/util/kubernetes" + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "github.com/operator-framework/operator-sdk/pkg/sdk" + "time" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + appsv1 "k8s.io/api/apps/v1" ) func init() { @@ -35,13 +41,13 @@ func init() { panic(err) } - err = install.Operator(GetTargetNamespace()) + err = install.Operator(getTargetNamespace()) if err != nil { panic(err) } } -func GetTargetNamespace() string { +func getTargetNamespace() string { ns, err := kubernetes.GetClientCurrentNamespace("") if err != nil { panic(err) @@ -49,7 +55,7 @@ func GetTargetNamespace() string { return ns } -func TimerToLogIntegrationCode() string { +func createTimerToLogIntegrationCode() string { return ` import org.apache.camel.builder.RouteBuilder; @@ -64,3 +70,109 @@ public class Routes extends RouteBuilder { } ` } + +func createDummyDeployment(name string, replicas *int32, labelKey string, labelValue string, command ...string) (*appsv1.Deployment, error) { + deployment := getDummyDeployment(name, replicas, labelKey, labelValue, command...) + gracePeriod := int64(0) + err := sdk.Delete(&deployment, sdk.WithDeleteOptions(&metav1.DeleteOptions{GracePeriodSeconds: &gracePeriod})) + if err != nil && !k8serrors.IsNotFound(err) { + return nil, err + } + for { + list := v1.PodList{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: v1.SchemeGroupVersion.String(), + }, + } + + err := sdk.List(getTargetNamespace(), &list, sdk.WithListOptions(&metav1.ListOptions{ + LabelSelector: labelKey + "=" + labelValue, + })) + if err != nil { + return nil, err + } + + if len(list.Items) > 0 { + time.Sleep(1*time.Second) + } else { + break + } + } + err = sdk.Create(&deployment) + return &deployment, err +} + +func getDummyDeployment(name string, replicas *int32, labelKey string, labelValue string, command ...string) appsv1.Deployment { + return appsv1.Deployment{ + TypeMeta: metav1.TypeMeta{ + Kind: "Deployment", + APIVersion: appsv1.SchemeGroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: getTargetNamespace(), + }, + Spec: appsv1.DeploymentSpec{ + Replicas: replicas, + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + labelKey: labelValue, + }, + }, + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + labelKey: labelValue, + }, + }, + Spec: getDummyPod(name, command...).Spec, + }, + }, + } +} + +func createDummyPod(name string, command ...string) (*v1.Pod, error) { + pod := getDummyPod(name, command...) + gracePeriod := int64(0) + err := sdk.Delete(&pod, sdk.WithDeleteOptions(&metav1.DeleteOptions{GracePeriodSeconds: &gracePeriod})) + if err != nil && !k8serrors.IsNotFound(err) { + return nil, err + } + for { + err := sdk.Create(&pod) + if err != nil && k8serrors.IsAlreadyExists(err) { + time.Sleep(1 * time.Second) + } else if err != nil { + return nil, err + } else { + break + } + } + return &pod, nil +} + +func getDummyPod(name string, command ...string) v1.Pod { + gracePeriod := int64(0) + return v1.Pod{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: v1.SchemeGroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: getTargetNamespace(), + Name: name, + }, + Spec: v1.PodSpec{ + TerminationGracePeriodSeconds: &gracePeriod, + Containers: []v1.Container{ + { + Name: name, + Image: "busybox", + Command: command, + }, + }, + }, + } +} +