This is an automated email from the ASF dual-hosted git repository.

lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k.git


The following commit(s) were added to refs/heads/master by this push:
     new 44f1e40  Pod log implementation
44f1e40 is described below

commit 44f1e40dcf038ada075f57ef073c3f113d7adbcb
Author: nferraro <ni.ferr...@gmail.com>
AuthorDate: Tue Sep 18 01:32:08 2018 +0200

    Pod log implementation
---
 Gopkg.lock                             |   1 +
 pkg/client/cmd/run.go                  |  25 +++++
 pkg/util/log/annotation_scraper.go     | 166 +++++++++++++++++++++++++++++++
 pkg/util/log/pod_scraper.go            | 175 +++++++++++++++++++++++++++++++++
 test/build_manager_integration_test.go |   8 +-
 test/local_builder_integration_test.go |  14 +--
 test/log_scrape_integration_test.go    | 106 ++++++++++++++++++++
 test/testing_env.go                    | 118 +++++++++++++++++++++-
 8 files changed, 599 insertions(+), 14 deletions(-)

diff --git a/Gopkg.lock b/Gopkg.lock
index bc54a02..823b59c 100644
--- a/Gopkg.lock
+++ b/Gopkg.lock
@@ -736,6 +736,7 @@
     "k8s.io/apimachinery/pkg/runtime/serializer/json",
     "k8s.io/apimachinery/pkg/runtime/serializer/versioning",
     "k8s.io/apimachinery/pkg/util/yaml",
+    "k8s.io/apimachinery/pkg/watch",
     "k8s.io/client-go/plugin/pkg/client/auth/gcp",
     "k8s.io/client-go/rest",
     "k8s.io/client-go/tools/clientcmd",
diff --git a/pkg/client/cmd/run.go b/pkg/client/cmd/run.go
index c4df815..171ed1d 100644
--- a/pkg/client/cmd/run.go
+++ b/pkg/client/cmd/run.go
@@ -32,6 +32,8 @@ import (
        "github.com/spf13/cobra"
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
        "k8s.io/apimachinery/pkg/apis/meta/v1"
+       "github.com/apache/camel-k/pkg/util/log"
+       "io"
 )
 
 // NewCmdRun --
@@ -56,6 +58,7 @@ func NewCmdRun(rootCmdOptions *RootCmdOptions) *cobra.Command 
{
        cmd.Flags().StringSliceVarP(&options.Properties, "property", "p", nil, 
"Add a camel property")
        cmd.Flags().StringSliceVar(&options.ConfigMaps, "configmap", nil, "Add 
a ConfigMap")
        cmd.Flags().StringSliceVar(&options.Secrets, "secret", nil, "Add a 
Secret")
+       cmd.Flags().BoolVar(&options.Logs, "logs", false, "Print integration 
logs")
 
        return &cmd
 }
@@ -70,6 +73,7 @@ type runCmdOptions struct {
        ConfigMaps         []string
        Secrets            []string
        Wait               bool
+       Logs               bool
 }
 
 func (*runCmdOptions) validateArgs(cmd *cobra.Command, args []string) error {
@@ -96,6 +100,12 @@ func (o *runCmdOptions) run(cmd *cobra.Command, args 
[]string) error {
                        return err
                }
        }
+       if o.Logs {
+               err = o.printLogs(integration)
+               if err != nil {
+                       return err
+               }
+       }
        return nil
 }
 
@@ -137,6 +147,21 @@ watcher:
        return nil
 }
 
+func (o *runCmdOptions) printLogs(integration *v1alpha1.Integration) error {
+       scraper := log.NewSelectorScraper(integration.Namespace, 
"camel.apache.org/integration=" + integration.Name)
+       reader := scraper.Start(o.Context)
+       for {
+               str, err := reader.ReadString('\n')
+               if err == io.EOF || o.Context.Err() != nil {
+                       break
+               } else if err != nil {
+                       return err
+               }
+               fmt.Print(str)
+       }
+       return nil
+}
+
 func (o *runCmdOptions) createIntegration(cmd *cobra.Command, args []string) 
(*v1alpha1.Integration, error) {
        code, err := o.loadCode(args[0])
        if err != nil {
diff --git a/pkg/util/log/annotation_scraper.go 
b/pkg/util/log/annotation_scraper.go
new file mode 100644
index 0000000..2e47b5e
--- /dev/null
+++ b/pkg/util/log/annotation_scraper.go
@@ -0,0 +1,166 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package log
+
+import (
+       "bufio"
+       "io"
+       "context"
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+       "github.com/operator-framework/operator-sdk/pkg/sdk"
+       "k8s.io/api/core/v1"
+       "sync"
+       "sync/atomic"
+       "strconv"
+       "github.com/sirupsen/logrus"
+       "time"
+)
+
+// SelectorScraper scrapes all pods with a given selector
+type SelectorScraper struct {
+       namespace     string
+       labelSelector string
+       podScrapers   sync.Map
+       counter       uint64
+}
+
+// NewSelectorScraper creates a new SelectorScraper
+func NewSelectorScraper(namespace string, labelSelector string) 
*SelectorScraper {
+       return &SelectorScraper{
+               namespace:     namespace,
+               labelSelector: labelSelector,
+       }
+}
+
+// Start returns a reader that streams the log of all selected pods
+func (s *SelectorScraper) Start(ctx context.Context) *bufio.Reader {
+       pipeIn, pipeOut := io.Pipe()
+       bufPipeIn := bufio.NewReader(pipeIn)
+       bufPipeOut := bufio.NewWriter(pipeOut)
+       closeFun := func() error {
+               bufPipeOut.Flush()
+               return pipeOut.Close()
+       }
+       go s.periodicSynchronize(ctx, bufPipeOut, closeFun)
+       return bufPipeIn
+}
+
+func (s *SelectorScraper) periodicSynchronize(ctx context.Context, out 
*bufio.Writer, clientCloser func() error) {
+       err := s.synchronize(ctx, out, clientCloser)
+       if err != nil {
+               logrus.Warn("Could not synchronize log by label " + 
s.labelSelector)
+       }
+       select {
+       case <- ctx.Done():
+               // cleanup
+               s.podScrapers.Range(func(k, v interface{}) bool {
+                       if canc, isCanc := v.(context.CancelFunc); isCanc {
+                               canc()
+                       }
+
+                       return true
+               })
+               clientCloser()
+       case <- time.After(2*time.Second):
+               go s.periodicSynchronize(ctx, out, clientCloser)
+       }
+}
+
+func (s *SelectorScraper) synchronize(ctx context.Context, out *bufio.Writer, 
clientCloser func() error) error {
+       list, err := s.listPods()
+       if err != nil {
+               return err
+       }
+
+       present := make(map[string]bool)
+       for _, pod := range list.Items {
+               present[pod.Name] = true
+               if _, ok := s.podScrapers.Load(pod.Name); !ok {
+                       s.addPodScraper(ctx, pod.Name, out)
+               }
+       }
+
+       toBeRemoved := make(map[string]bool)
+       s.podScrapers.Range(func(k, v interface{}) bool {
+               if str, isStr := k.(string); isStr {
+                       if _, ok := present[str]; !ok {
+                               toBeRemoved[str] = true
+                       }
+               }
+
+               return true
+       })
+
+       for podName := range toBeRemoved {
+               if scr, ok := s.podScrapers.Load(podName); ok {
+                       if canc, ok2 := scr.(context.CancelFunc); ok2 {
+                               canc()
+                               s.podScrapers.Delete(podName)
+                       }
+               }
+       }
+       return nil
+}
+
+func (s *SelectorScraper) addPodScraper(ctx context.Context, name string, out 
*bufio.Writer) {
+       podScraper := NewPodScraper(s.namespace, name)
+       podCtx, podCancel := context.WithCancel(ctx)
+       id := atomic.AddUint64(&s.counter, 1)
+       prefix := "[" + strconv.FormatUint(id, 10) + "] "
+       podReader := podScraper.Start(podCtx)
+       s.podScrapers.Store(name, podCancel)
+       go func() {
+               defer podCancel()
+
+               out.WriteString(prefix + "Monitoring pod " + name)
+               for {
+                       str, err := podReader.ReadString('\n')
+                       if err == io.EOF {
+                               return
+                       } else if err != nil {
+                               logrus.Error("Cannot read from pod stream: ", 
err)
+                               return
+                       }
+                       out.WriteString(prefix + str)
+                       out.Flush()
+                       if podCtx.Err() != nil {
+                               return
+                       }
+               }
+       }()
+
+}
+
+func (s *SelectorScraper) listPods() (*v1.PodList, error) {
+       list := v1.PodList{
+               TypeMeta: metav1.TypeMeta{
+                       Kind:       "Pod",
+                       APIVersion: v1.SchemeGroupVersion.String(),
+               },
+       }
+
+       err := sdk.List(s.namespace, &list, 
sdk.WithListOptions(&metav1.ListOptions{
+               LabelSelector: s.labelSelector,
+       }))
+
+       if err != nil {
+               return nil, err
+       }
+
+       return &list, nil
+}
diff --git a/pkg/util/log/pod_scraper.go b/pkg/util/log/pod_scraper.go
new file mode 100644
index 0000000..798f5b7
--- /dev/null
+++ b/pkg/util/log/pod_scraper.go
@@ -0,0 +1,175 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package log
+
+import (
+       "bufio"
+       "context"
+       "github.com/operator-framework/operator-sdk/pkg/k8sclient"
+       "github.com/operator-framework/operator-sdk/pkg/util/k8sutil"
+       "github.com/pkg/errors"
+       "github.com/sirupsen/logrus"
+       "io"
+       "k8s.io/api/core/v1"
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+       "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
+       "k8s.io/apimachinery/pkg/runtime"
+       "k8s.io/apimachinery/pkg/watch"
+       "time"
+)
+
+// PodScraper scrapes logs of a specific pod
+type PodScraper struct {
+       namespace string
+       name      string
+}
+
+// NewPodScraper creates a new pod scraper
+func NewPodScraper(namespace string, name string) *PodScraper {
+       return &PodScraper{
+               namespace: namespace,
+               name:      name,
+       }
+}
+
+// Start returns a reader that streams the pod logs
+func (s *PodScraper) Start(ctx context.Context) *bufio.Reader {
+       pipeIn, pipeOut := io.Pipe()
+       bufPipeIn := bufio.NewReader(pipeIn)
+       bufPipeOut := bufio.NewWriter(pipeOut)
+       closeFun := func() error {
+               bufPipeOut.Flush()
+               return pipeOut.Close()
+       }
+       go s.doScrape(ctx, bufPipeOut, closeFun)
+       return bufPipeIn
+}
+
+func (s *PodScraper) doScrape(ctx context.Context, out *bufio.Writer, 
clientCloser func() error) {
+       err := s.waitForPodRunning(ctx, s.namespace, s.name)
+       if err != nil {
+               s.handleAndRestart(ctx, err, 5*time.Second, out, clientCloser)
+               return
+       }
+
+       byteReader, err := 
k8sclient.GetKubeClient().CoreV1().Pods(s.namespace).GetLogs(s.name, 
&v1.PodLogOptions{Follow: true}).Context(ctx).Stream()
+       if err != nil {
+               s.handleAndRestart(ctx, err, 5*time.Second, out, clientCloser)
+               return
+       }
+
+       reader := bufio.NewReader(byteReader)
+       err = nil
+       for err == nil {
+               str, err := reader.ReadString('\n')
+               if err != nil {
+                       break
+               }
+               _, err = out.WriteString(str)
+               if err != nil {
+                       break
+               }
+               out.Flush()
+       }
+       if err == io.EOF {
+               return
+       }
+
+       s.handleAndRestart(ctx, err, 5*time.Second, out, clientCloser)
+}
+
+func (s *PodScraper) handleAndRestart(ctx context.Context, err error, wait 
time.Duration, out *bufio.Writer, clientCloser func() error) {
+       if err != nil {
+               logrus.Warn(errors.Wrap(err, "error caught during log scraping 
for pod "+s.name))
+       }
+
+       if ctx.Err() != nil {
+               logrus.Info("Pod ", s.name, " will no longer be monitored")
+               clientCloser()
+               return
+       }
+
+       logrus.Info("Retrying to scrape pod ", s.name, " logs in ", 
wait.Seconds(), " seconds...")
+       select {
+       case <-time.After(wait):
+               break
+       case <-ctx.Done():
+               clientCloser()
+               return
+       }
+
+       s.doScrape(ctx, out, clientCloser)
+}
+
+// Waits for a given pod to reach the running state
+func (s *PodScraper) waitForPodRunning(ctx context.Context, namespace string, 
name string) error {
+       pod := v1.Pod{
+               TypeMeta: metav1.TypeMeta{
+                       Kind:       "Pod",
+                       APIVersion: v1.SchemeGroupVersion.String(),
+               },
+               ObjectMeta: metav1.ObjectMeta{
+                       Name:      name,
+                       Namespace: namespace,
+               },
+       }
+       resourceClient, _, err := k8sclient.GetResourceClient(pod.APIVersion, 
pod.Kind, pod.Namespace)
+       if err != nil {
+               return err
+       }
+       watcher, err := resourceClient.Watch(metav1.ListOptions{
+               FieldSelector: "metadata.name=" + pod.Name,
+       })
+       if err != nil {
+               return err
+       }
+       events := watcher.ResultChan()
+       for {
+               select {
+               case <-ctx.Done():
+                       return ctx.Err()
+               case e, ok := <-events:
+                       if !ok {
+                               return errors.New("event channel closed")
+                       }
+
+                       if e.Object != nil {
+                               if runtimeUnstructured, ok := 
e.Object.(runtime.Unstructured); ok {
+                                       unstr := unstructured.Unstructured{
+                                               Object: 
runtimeUnstructured.UnstructuredContent(),
+                                       }
+                                       pcopy := pod.DeepCopy()
+                                       err := 
k8sutil.UnstructuredIntoRuntimeObject(&unstr, pcopy)
+                                       if err != nil {
+                                               return err
+                                       }
+
+                                       if pcopy.Status.Phase == v1.PodRunning {
+                                               return nil
+                                       }
+                               }
+                       } else if e.Type == watch.Deleted || e.Type == 
watch.Error {
+                               return errors.New("unable to watch pod " + 
s.name)
+                       }
+               case <-time.After(30 * time.Second):
+                       return errors.New("no state change after 30 seconds for 
pod " + s.name)
+               }
+       }
+
+       return nil
+}
diff --git a/test/build_manager_integration_test.go 
b/test/build_manager_integration_test.go
index d031fb4..a352009 100644
--- a/test/build_manager_integration_test.go
+++ b/test/build_manager_integration_test.go
@@ -34,7 +34,7 @@ import (
 
 func TestBuildManagerBuild(t *testing.T) {
        ctx := context.TODO()
-       buildManager := build.NewManager(ctx, GetTargetNamespace())
+       buildManager := build.NewManager(ctx, getTargetNamespace())
        identifier := buildapi.BuildIdentifier{
                Name:   "man-test",
                Qualifier: digest.Random(),
@@ -42,7 +42,7 @@ func TestBuildManagerBuild(t *testing.T) {
        buildManager.Start(buildapi.BuildSource{
                Identifier: identifier,
                Code: buildapi.Code{
-                       Content: TimerToLogIntegrationCode(),
+                       Content: createTimerToLogIntegrationCode(),
                },
                Dependencies: []string{
                        "mvn:org.apache.camel/camel-core",
@@ -68,7 +68,7 @@ func TestBuildManagerBuild(t *testing.T) {
 func TestBuildManagerFailedBuild(t *testing.T) {
 
        ctx := context.TODO()
-       buildManager := build.NewManager(ctx, GetTargetNamespace())
+       buildManager := build.NewManager(ctx, getTargetNamespace())
        identifier := buildapi.BuildIdentifier{
                Name:   "man-test-2",
                Qualifier: digest.Random(),
@@ -76,7 +76,7 @@ func TestBuildManagerFailedBuild(t *testing.T) {
        buildManager.Start(buildapi.BuildSource{
                Identifier: identifier,
                Code: buildapi.Code{
-                       Content: TimerToLogIntegrationCode(),
+                       Content: createTimerToLogIntegrationCode(),
                },
                Dependencies: []string{
                        "mvn:org.apache.camel/camel-cippalippa",
diff --git a/test/local_builder_integration_test.go 
b/test/local_builder_integration_test.go
index 79cedeb..61983ce 100644
--- a/test/local_builder_integration_test.go
+++ b/test/local_builder_integration_test.go
@@ -34,7 +34,7 @@ import (
 func TestLocalBuild(t *testing.T) {
 
        ctx := context.TODO()
-       builder := local.NewLocalBuilder(ctx, GetTargetNamespace())
+       builder := local.NewLocalBuilder(ctx, getTargetNamespace())
 
        execution := builder.Build(build.BuildSource{
                Identifier: build.BuildIdentifier{
@@ -42,7 +42,7 @@ func TestLocalBuild(t *testing.T) {
                        Qualifier: digest.Random(),
                },
                Code: build.Code{
-                       Content: TimerToLogIntegrationCode(),
+                       Content: createTimerToLogIntegrationCode(),
                },
        })
 
@@ -54,7 +54,7 @@ func TestLocalBuild(t *testing.T) {
 func TestLocalDoubleBuild(t *testing.T) {
 
        ctx := context.TODO()
-       builder := local.NewLocalBuilder(ctx, GetTargetNamespace())
+       builder := local.NewLocalBuilder(ctx, getTargetNamespace())
 
        execution1 := builder.Build(build.BuildSource{
                Identifier: build.BuildIdentifier{
@@ -62,7 +62,7 @@ func TestLocalDoubleBuild(t *testing.T) {
                        Qualifier: digest.Random(),
                },
                Code: build.Code{
-                       Content: TimerToLogIntegrationCode(),
+                       Content: createTimerToLogIntegrationCode(),
                },
        })
 
@@ -72,7 +72,7 @@ func TestLocalDoubleBuild(t *testing.T) {
                        Qualifier: digest.Random(),
                },
                Code: build.Code{
-                       Content: TimerToLogIntegrationCode(),
+                       Content: createTimerToLogIntegrationCode(),
                },
        })
 
@@ -86,7 +86,7 @@ func TestLocalDoubleBuild(t *testing.T) {
 func TestLocalFailedBuild(t *testing.T) {
 
        ctx := context.TODO()
-       builder := local.NewLocalBuilder(ctx, GetTargetNamespace())
+       builder := local.NewLocalBuilder(ctx, getTargetNamespace())
 
        execution := builder.Build(build.BuildSource{
                Identifier: build.BuildIdentifier{
@@ -94,7 +94,7 @@ func TestLocalFailedBuild(t *testing.T) {
                        Qualifier: digest.Random(),
                },
                Code: build.Code{
-                       Content: TimerToLogIntegrationCode(),
+                       Content: createTimerToLogIntegrationCode(),
                },
                Dependencies: []string{
                        "camel:cippalippa",
diff --git a/test/log_scrape_integration_test.go 
b/test/log_scrape_integration_test.go
new file mode 100644
index 0000000..2052cc5
--- /dev/null
+++ b/test/log_scrape_integration_test.go
@@ -0,0 +1,106 @@
+// +build integration
+
+// To enable compilation of this file in Goland, go to "Settings -> Go -> 
Vendoring & Build Tags -> Custom Tags" and add "integration"
+
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package test
+
+import (
+       "testing"
+       "github.com/apache/camel-k/pkg/util/log"
+       "context"
+       "time"
+       "github.com/stretchr/testify/assert"
+       "strings"
+       "github.com/operator-framework/operator-sdk/pkg/sdk"
+)
+
+func TestPodLogScrape(t *testing.T) {
+       token := "Hello Camel K!"
+       pod, err := createDummyPod("scraped", "/bin/sh", "-c", "for i in `seq 1 
50`; do echo \""+token+"\" && sleep 2; done")
+       defer sdk.Delete(pod)
+       assert.Nil(t, err)
+
+       ctx, _ := context.WithDeadline(context.Background(), 
time.Now().Add(30*time.Second))
+       scraper := log.NewPodScraper(pod.Namespace, pod.Name)
+       in := scraper.Start(ctx)
+
+       res := make(chan bool)
+       go func() {
+               for {
+                       if dl, _ := ctx.Deadline(); time.Now().After(dl) {
+                               return
+                       }
+
+                       str, _ := in.ReadString('\n')
+                       if strings.Contains(str, token) {
+                               res <- true
+                               return
+                       }
+               }
+       }()
+
+       select {
+       case <-res:
+               break
+       case <-time.After(30 * time.Second):
+               assert.Fail(t, "timeout while waiting from token")
+       }
+}
+
+func TestSelectorLogScrape(t *testing.T) {
+       token := "Hello Camel K!"
+       replicas := int32(3)
+       deployment, err := createDummyDeployment("scraped-deployment", 
&replicas, "scrape", "me", "/bin/sh", "-c", "for i in `seq 1 50`; do echo 
\""+token+"\" && sleep 2; done")
+       defer sdk.Delete(deployment)
+       assert.Nil(t, err)
+
+       ctx, _ := context.WithDeadline(context.Background(), 
time.Now().Add(30*time.Second))
+       scraper := log.NewSelectorScraper(deployment.Namespace, "scrape=me")
+       in := scraper.Start(ctx)
+
+       res := make(chan string)
+       go func() {
+               for {
+                       if dl, _ := ctx.Deadline(); time.Now().After(dl) {
+                               return
+                       }
+
+                       str, _ := in.ReadString('\n')
+                       if strings.Contains(str, token) {
+                               res <- str[0:3]
+                       }
+               }
+       }()
+
+       recv := make(map[string]bool)
+loop:
+       for {
+               select {
+               case r := <-res:
+                       recv[r] = true
+                       if len(recv) == 3 {
+                               break loop
+                       }
+               case <-time.After(13 * time.Second):
+                       assert.Fail(t, "timeout while waiting from token")
+                       break loop
+               }
+       }
+}
diff --git a/test/testing_env.go b/test/testing_env.go
index 3b1e215..10fc8e5 100644
--- a/test/testing_env.go
+++ b/test/testing_env.go
@@ -24,6 +24,12 @@ package test
 import (
        "github.com/apache/camel-k/pkg/install"
        "github.com/apache/camel-k/pkg/util/kubernetes"
+       "k8s.io/api/core/v1"
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+       "github.com/operator-framework/operator-sdk/pkg/sdk"
+       "time"
+       k8serrors "k8s.io/apimachinery/pkg/api/errors"
+       appsv1 "k8s.io/api/apps/v1"
 )
 
 func init() {
@@ -35,13 +41,13 @@ func init() {
                panic(err)
        }
 
-       err = install.Operator(GetTargetNamespace())
+       err = install.Operator(getTargetNamespace())
        if err != nil {
                panic(err)
        }
 }
 
-func GetTargetNamespace() string {
+func getTargetNamespace() string {
        ns, err := kubernetes.GetClientCurrentNamespace("")
        if err != nil {
                panic(err)
@@ -49,7 +55,7 @@ func GetTargetNamespace() string {
        return ns
 }
 
-func TimerToLogIntegrationCode() string {
+func createTimerToLogIntegrationCode() string {
        return `
 import org.apache.camel.builder.RouteBuilder;
 
@@ -64,3 +70,109 @@ public class Routes extends RouteBuilder {
 }
 `
 }
+
+func createDummyDeployment(name string, replicas *int32, labelKey string, 
labelValue string, command ...string) (*appsv1.Deployment, error) {
+       deployment := getDummyDeployment(name, replicas, labelKey, labelValue, 
command...)
+       gracePeriod := int64(0)
+       err := sdk.Delete(&deployment, 
sdk.WithDeleteOptions(&metav1.DeleteOptions{GracePeriodSeconds: &gracePeriod}))
+       if err != nil && !k8serrors.IsNotFound(err) {
+               return nil, err
+       }
+       for {
+               list := v1.PodList{
+                       TypeMeta: metav1.TypeMeta{
+                               Kind:       "Pod",
+                               APIVersion: v1.SchemeGroupVersion.String(),
+                       },
+               }
+
+               err := sdk.List(getTargetNamespace(), &list, 
sdk.WithListOptions(&metav1.ListOptions{
+                       LabelSelector: labelKey + "=" + labelValue,
+               }))
+               if err != nil {
+                       return nil, err
+               }
+
+               if len(list.Items) > 0 {
+                       time.Sleep(1*time.Second)
+               } else {
+                       break
+               }
+       }
+       err = sdk.Create(&deployment)
+       return &deployment, err
+}
+
+func getDummyDeployment(name string, replicas *int32, labelKey string, 
labelValue string, command ...string) appsv1.Deployment {
+       return appsv1.Deployment{
+               TypeMeta: metav1.TypeMeta{
+                       Kind:       "Deployment",
+                       APIVersion: appsv1.SchemeGroupVersion.String(),
+               },
+               ObjectMeta: metav1.ObjectMeta{
+                       Name:        name,
+                       Namespace:   getTargetNamespace(),
+               },
+               Spec: appsv1.DeploymentSpec{
+                       Replicas: replicas,
+                       Selector: &metav1.LabelSelector{
+                               MatchLabels: map[string]string{
+                                       labelKey: labelValue,
+                               },
+                       },
+                       Template: v1.PodTemplateSpec{
+                               ObjectMeta: metav1.ObjectMeta{
+                                       Labels: map[string]string{
+                                               labelKey: labelValue,
+                                       },
+                               },
+                               Spec: getDummyPod(name, command...).Spec,
+                       },
+               },
+       }
+}
+
+func createDummyPod(name string, command ...string) (*v1.Pod, error) {
+       pod := getDummyPod(name, command...)
+       gracePeriod := int64(0)
+       err := sdk.Delete(&pod, 
sdk.WithDeleteOptions(&metav1.DeleteOptions{GracePeriodSeconds: &gracePeriod}))
+       if err != nil && !k8serrors.IsNotFound(err) {
+               return nil, err
+       }
+       for {
+               err := sdk.Create(&pod)
+               if err != nil && k8serrors.IsAlreadyExists(err) {
+                       time.Sleep(1 * time.Second)
+               } else if err != nil {
+                       return nil, err
+               } else {
+                       break
+               }
+       }
+       return &pod, nil
+}
+
+func getDummyPod(name string, command ...string) v1.Pod {
+       gracePeriod := int64(0)
+       return v1.Pod{
+               TypeMeta: metav1.TypeMeta{
+                       Kind:       "Pod",
+                       APIVersion: v1.SchemeGroupVersion.String(),
+               },
+               ObjectMeta: metav1.ObjectMeta{
+                       Namespace: getTargetNamespace(),
+                       Name:      name,
+               },
+               Spec: v1.PodSpec{
+                       TerminationGracePeriodSeconds: &gracePeriod,
+                       Containers: []v1.Container{
+                               {
+                                       Name:    name,
+                                       Image:   "busybox",
+                                       Command: command,
+                               },
+                       },
+               },
+       }
+}
+

Reply via email to