Skip to content

Plugin Development Guide

1. Prerequisites

Development Tools

  • Visual Studio Code
  • Goland
  • Cursor

Install gRPC

shell
$ go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.28
$ go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.2

Install gRPC-Gateway

shell
$ go install github.com/grpc-ecosystem/grpc-gateway/v2/protoc-gen-grpc-gateway@latest
$ go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
$ go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest

Project Setup

  • Create a Go project, e.g., MyPlugin
  • Create a pkg directory for exportable code
  • Create a pb directory for gRPC proto files
  • Create an example directory for testing the plugin

You can also create a directory xxx directly in the monibuca project's plugin folder to store your plugin code

2. Create a Plugin

go
package plugin_myplugin
import (
    "m7s.live/v5"
)

var _ = m7s.InstallPlugin[MyPlugin]()

type MyPlugin struct {
    m7s.Plugin
    Foo string
}
  • MyPlugin struct is the plugin definition, Foo is a plugin property that can be configured in the configuration file
  • Must embed m7s.Plugin struct to provide basic plugin functionality
  • m7s.InstallPlugin[MyPlugin](...) registers the plugin so it can be loaded by monibuca

Provide Default Configuration

Example:

go
const defaultConfig = m7s.DefaultYaml(`tcp:
  listenaddr: :5554`)

var _ = m7s.InstallPlugin[MyPlugin](defaultConfig)

3. Implement Event Callbacks (Optional)

Initialization Callback

go
func (config *MyPlugin) OnInit() (err error) {
    // Initialize things
    return
}

Used for plugin initialization after configuration is loaded. Return an error if initialization fails, and the plugin will be disabled.

TCP Request Callback

go
func (config *MyPlugin) OnTCPConnect(conn *net.TCPConn) task.ITask {
    
}

Called when receiving TCP connection requests if TCP listening port is configured.

UDP Request Callback

go
func (config *MyPlugin) OnUDPConnect(conn *net.UDPConn) task.ITask {

}

Called when receiving UDP connection requests if UDP listening port is configured.

QUIC Request Callback

go
func (config *MyPlugin) OnQUICConnect(quic.Connection) task.ITask {

}

Called when receiving QUIC connection requests if QUIC listening port is configured.

4. HTTP Interface Callbacks

Legacy v4 Callback Style

go
func (config *MyPlugin) API_test1(rw http.ResponseWriter, r *http.Request) {
    // do something
}

Accessible via http://ip:port/myplugin/api/test1

Route Mapping Configuration

This method supports parameterized routing:

go
func (config *MyPlugin) RegisterHandler() map[string]http.HandlerFunc {
    return map[string]http.HandlerFunc{
        "/test1/{streamPath...}": config.test1,
    }
}
func (config *MyPlugin) test1(rw http.ResponseWriter, r *http.Request) {
    streamPath := r.PathValue("streamPath")
    // do something
}

5. Implement Push/Pull Clients

Implement Push Client

Push client needs to implement IPusher interface and pass the creation method to InstallPlugin.

go
type Pusher struct {
    pullCtx m7s.PullJob
}

func (c *Pusher) GetPullJob() *m7s.PullJob {
    return &c.pullCtx
}

func NewPusher(_ config.Push) m7s.IPusher {
    return &Pusher{}
}
var _ = m7s.InstallPlugin[MyPlugin](NewPusher)

Implement Pull Client

Pull client needs to implement IPuller interface and pass the creation method to InstallPlugin. The following Puller inherits from m7s.HTTPFilePuller for basic file and HTTP pulling:

go
type Puller struct {
    m7s.HTTPFilePuller
}

func NewPuller(_ config.Pull) m7s.IPuller {
    return &Puller{}
}
var _ = m7s.InstallPlugin[MyPlugin](NewPuller)

6. Implement gRPC Service

Create myplugin.proto in pb Directory

proto
syntax = "proto3";
import "google/api/annotations.proto";
import "google/protobuf/empty.proto";
package myplugin;
option go_package="m7s.live/v5/plugin/myplugin/pb";

service api {
    rpc MyMethod (MyRequest) returns (MyResponse) {
        option (google.api.http) = {
            post: "/myplugin/api/bar"
            body: "foo"
        };
    }
}
message MyRequest {
    string foo = 1;
}
message MyResponse {
    string bar = 1;
}

Generate gRPC Code

Add to VSCode task.json:

json
{
    "type": "shell",
    "label": "build pb myplugin",
    "command": "protoc",
    "args": [
        "-I.",
        "-I${workspaceRoot}/pb",
        "--go_out=.",
        "--go_opt=paths=source_relative",
        "--go-grpc_out=.",
        "--go-grpc_opt=paths=source_relative",
        "--grpc-gateway_out=.",
        "--grpc-gateway_opt=paths=source_relative",
        "myplugin.proto"
    ],
    "options": {
        "cwd": "${workspaceRoot}/plugin/myplugin/pb"
    }
}

Or run command in pb directory:

shell
protoc -I. -I$ProjectFileDir$/pb --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative --grpc-gateway_out=. --grpc-gateway_opt=paths=source_relative  myplugin.proto

Replace $ProjectFileDir$ with the directory containing global pb files.

Implement gRPC Service

Create api.go:

go
package plugin_myplugin
import (
    "context"
    "m7s.live/m7s/v5"
    "m7s.live/m7s/v5/plugin/myplugin/pb"
)

func (config *MyPlugin) MyMethod(ctx context.Context, req *pb.MyRequest) (*pb.MyResponse, error) {
    return &pb.MyResponse{Bar: req.Foo}, nil
}

Register gRPC Service

go
package plugin_myplugin
import (
    "m7s.live/v5"
    "m7s.live/v5/plugin/myplugin/pb"
)

var _ = m7s.InstallPlugin[MyPlugin](&pb.Api_ServiceDesc, pb.RegisterApiHandler)

type MyPlugin struct {
    pb.UnimplementedApiServer
    m7s.Plugin
    Foo string
}

Additional RESTful Endpoints

Same as v4:

go
func (config *MyPlugin) API_test1(rw http.ResponseWriter, r *http.Request) {
    // do something
}

Accessible via GET request to /myplugin/api/test1

7. Publishing Streams

go
publisher, err = p.Publish(streamPath, connectInfo)

The last two parameters are optional.

After obtaining the publisher, you can publish audio/video data using publisher.WriteAudio and publisher.WriteVideo.

Define Audio/Video Data

If existing audio/video data formats don't meet your needs, you can define custom formats by implementing this interface:

go
IAVFrame interface {
    GetAllocator() *util.ScalableMemoryAllocator
    SetAllocator(*util.ScalableMemoryAllocator)
    Parse(*AVTrack) error
    ConvertCtx(codec.ICodecCtx) (codec.ICodecCtx, IAVFrame, error)
    Demux(codec.ICodecCtx) (any, error)
    Mux(codec.ICodecCtx, *AVFrame)
    GetTimestamp() time.Duration
    GetCTS() time.Duration
    GetSize() int
    Recycle()
    String() string
    Dump(byte, io.Writer)
}

Define separate types for audio and video

  • GetAllocator/SetAllocator: Automatically implemented when embedding RecyclableMemory
  • Parse: Identifies key frames, sequence frames, and other important information
  • ConvertCtx: Called when protocol conversion is needed
  • Demux: Called when audio/video data needs to be demuxed
  • Mux: Called when audio/video data needs to be muxed
  • Recycle: Automatically implemented when embedding RecyclableMemory
  • String: Prints audio/video data information
  • GetSize: Gets the size of audio/video data
  • GetTimestamp: Gets the timestamp in nanoseconds
  • GetCTS: Gets the Composition Time Stamp in nanoseconds (PTS = DTS+CTS)
  • Dump: Prints binary audio/video data

8. Subscribing to Streams

go
var suber *m7s.Subscriber
suber, err = p.Subscribe(ctx,streamPath)
go m7s.PlayBlock(suber, handleAudio, handleVideo)

Note that handleAudio and handleVideo are callback functions you need to implement. They take an audio/video format type as input and return an error. If the error is not nil, the subscription is terminated.

9. Prometheus Integration

Just implement the Collector interface, and the system will automatically collect metrics from all plugins:

go
func (p *MyPlugin) Describe(ch chan<- *prometheus.Desc) {
  
}

func (p *MyPlugin) Collect(ch chan<- prometheus.Metric) {
  
}

Released under the AGPL License