HAOJX

cue工作流入门介绍

字数统计: 679阅读时长: 3 min
2022/08/11 Share

官方文档介绍 https://pkg.go.dev/cuelang.org/go/tools/flow

在k8s中 可以把他用在开发CD/CD中的cd流程 比如**kubevela**(https://kubevela.io/zh/docs/)就的就它搞的 非常不错

一个简单的代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package main

import (
"context"
"cuelang.org/go/cue"
"cuelang.org/go/cue/cuecontext"
"cuelang.org/go/tools/flow"
"fmt"
"log"
)

const tasks = `
reg: {
uname: "test"
pass: "1234"
}
regrsp: {
result: string
}
data: {
data: string
}
`

func main() {
cc := cuecontext.New()
cv := cc.CompileString(tasks)

//cue worlflow
controller := flow.New(nil, cv, regflow)
if err := controller.Run(context.Background()); err != nil {
log.Fatal(err)
}
for _, step := range controller.Tasks() {
fmt.Println(step.Value())
}
}

func regflow(v cue.Value) (flow.Runner, error) {
uname := v.LookupPath(cue.ParsePath("uname"))
result := v.LookupPath(cue.ParsePath("result"))
if !uname.Exists() && !result.Exists() {
return nil, nil
}
return flow.RunnerFunc(func(t *flow.Task) error {
if uname.Exists() {
unameStr, err := uname.String()
if err != nil {
return err
}
if unameStr == "admin" {
return fmt.Errorf("can not register admin account")
}
return nil
}
return t.Fill(map[string]interface{}{
"result": "register success!!",
})
}), nil
}

大体套路是:

  1. 先启动一个工作流 函数是flow.New 他有三个参数 第一个是一个config 这个后面在说 填nil也可以, 第二个是cue.InstanceOrValue类型的数据 , 他是个interface
1
2
3
4
5
type InstanceOrValue interface {
Value() Value

internal()
}

实现他的类型有Instance和Value

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
type Instance struct {
index *runtime.Runtime

root *adt.Vertex

ImportPath string
Dir string
PkgName string
DisplayName string

Incomplete bool // true if Pkg and all its dependencies are free of errors
Err errors.Error // non-nil if the package had errors

inst *build.Instance
}

1
2
3
4
5
6
7
type Value struct {
idx *runtime.Runtime
v *adt.Vertex
// Parent keeps track of the parent if the value corresponding to v.Parent
// differs, recursively.
parent_ *parent
}

所以我们用CompileString函数 他返回了一个value类型的值

1
2
3
4
5
6
7
8
// CompileString parses and build a Value from the given source string.
//
// The returned Value will represent an error, accessible through Err, if any
// error occurred.
func (c *Context) CompileString(src string, options ...BuildOption) Value {
cfg := c.parseOptions(options)
return c.compile(c.runtime().Compile(&cfg, src))
}

第三个参数是个函数 type TaskFunc func(v cue.Value) (Runner, error) 只要实现了这种函数形式就可以 后面的runner是个interface flow自己提供了一个实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
type Runner interface {
// Run runs a Task. If any of the tasks it depends on returned an error it
// is passed to this task. It reports an error upon failure.
//
// Any results to be returned can be set by calling Fill on the passed task.
//
// TODO: what is a good contract for receiving and passing errors and abort.
//
// If for a returned error x errors.Is(x, ErrAbort), all dependant tasks
// will not be run, without this being an error.
Run(t *Task, err error) error
}

// A RunnerFunc runs a Task.
type RunnerFunc func(t *Task) error

func (f RunnerFunc) Run(t *Task, err error) error {
return f(t)
}

所以runner返回值的话 我们就可以写成上面例子中的

1
2
3
4
5
6
7
8
9
...

return flow.RunnerFunc(func(t *flow.Task) error {
....
//写你想做的事情, 比如调用k8s的api去apply一个yaml文件
// 比如可以规定文件apply顺序 一个错了后面的就不执行了
}

....
CATALOG