Skip to content

Notifications

Notifications in context of ppacer are messages sent externally to ppacer. Instant example of notification we can think of is an alert message sent when one of DAG run has failed.

The interface for notification sender is defined in ppacer/core/notify package and looks more or less like the following.

package notify
import (
"context"
"io"
)
type Template interface {
Execute(io.Writer, any) error
}
type Sender interface {
Send(context.Context, Template, MsgData) error
}
type MsgData struct {
DagId string
ExecTs string
TaskId *string
TaskRunError error
RuntimeInfo map[string]any
}

Interface notify.Sender has exactly one method Send which for given context, message template and relevant data renders and sends the message. Message template can be anything which satisfy notify.Template interface. That interface was designed primarily, to support both text/template and html/template standard Go packages for templating. Type notify.MsgData contains runtime information about DAG run relevant to the notification we are about to render and send.

Package notify provides also two implementations of notify.Sender interface:

  • notify.LogsErr - notifications are sent as slog/log error messages. This is the default set for local development.
  • notify.Mock - notifications are sent via appending existing *[]string. This implementations is primarily used for internal ppacer testing.

Notifications in ppacer

Notification senders in ppacer are used primarily in two places - scheduler.TaskScheduler and dag.Task.Execute method implementations via dag.TaskContext.

Scheduler needs to have notifier setup, to automatically send notifications when a Task execution has failed. This action is configurable on Task level via TaskConfig.SendAlertOnFailure with default value set to true. In the same configuration type we have a field called AlertOnFailureTemplate which states what template should be used to render alert messages. Default alert template is defined using text/template as follows:

func DefaultAlertTemplate() *template.Template {
body := `
Task [{{.TaskId}}] in DAG [{{.DagId}}] at {{.ExecTs}} has failed.
{{- if .TaskRunError}}
Error:
{{.TaskRunError.Error}}
{{end}}
`
return template.Must(template.New("default").Parse(body))
}

Even though configuration on sending alerts and its template are set on Task level, notification sender type is set on scheduler.TaskScheduler level via its Notifier field of type notify.Sender. Default setting for notification sender is notify.LogsErr which sends notifications as log ERR messages. That means ppacer uses single way of sending notifications (for example using Slack) for all DAGs and tasks. We can override this setup on Task level when we are constructing a DAG, like this:

...
emailNotifier := MyEmailNotifier()
n1 := dag.NewNode(emptyTask{taskId: "start"})
n2 := dag.NewNode(
errTask{taskId: "task1"},
dag.WithCustomNotifier(emailNotifier),
)
n3 := dag.NewNode(emptyTask{taskId: "end"})
n1.Next(n2).Next(n3)
myDag := dag.New(dagId).AddRoot(n1).Done()

In the example above tasks start and end would use default notifier set in TaskScheduler, but task task1 would use email notifier (it’s just an example, it’s not a part of notify package at the moment).

The second place where notification senders can be used is Task.Execute method via TaskContext. It enables sending external notifications based on task’s internal logic. For example task could query a database, read some statistic and based on its value send alert message.

Task are executed by exec.Executor. Similarly to scheduler, we setup single instance of notification sender in exec.Executor constructor and it will be used for all TaskContext unless given task used WithCustomNotifier, to update the configuration, the same way as described earlier. Using notifier from inside Task.Execute method is simple as regular function call:

func (t *MyTask) Execute(tc dag.TaskContext) error {
if rand.Intn(10) > 7 {
ctx := context.TODO()
tmplBody := "Sending a ping: {{.ExecTs}}"
tmpl := template.Must(template.New("msg").Parse(tmplBody))
msgData := notify.MsgData{ExecTs: tc.DagRun.ExecTs}
sendErr := tc.Notifier.Send(ctx, tmpl, msgData)
if sendErr != nil {
tc.Logger.Error("Failure while sending external message",
"err", sendErr.Error())
return sendErr
}
}
return nil
}

In the example above we can see a Task implementation which sends an external message in case when randomly generated number is greater than seven.

Notifier implementations

Up to this point we learned about the generic interface for sending notifications and its implementations within ppacer/core/notify package. Those implementations are primarily useful for local development and writing tests. More practical implementations are available in github.com/ppacer/notifiers project.

Project ppacer/notifiers has the following structure:

  • .git
  • README.md
  • Directorydiscord
    • go.mod
    • go.sum
    • discord.go
  • Directorytelegram
    • go.mod
    • go.sum
    • telegram.go

As we can see, the ppacer/notifiers project contains multiple Go packages. Each Go package provides at least one implementation of ppacer/core/notify.Sender for a specific communication channel. Different communication channels are divided into separate Go packages, because they might require different sets of dependencies, and typically only one communication channel is used for the scheduler. Fetching all other dependencies for other notifiers would be suboptimal. That is also why we avoided adding communication-channel-specific implementations to the ppacer core notification package.

To get the current list of available notifier implementations, please refer to ppacer/notifiers/README README file.