transport

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 9, 2025 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Overview

Package transport provides functionality for communicating between members.

Index

Constants

This section is empty.

Variables

View Source
var (
	TransmitBytes = prometheus.NewCounterVec(
		prometheus.CounterOpts{
			Name: "membership_list_transport_transmit_bytes_total",
			Help: "Total number of bytes transmitted.",
		},
		[]string{"transport"},
	)
	TransmitErrors = prometheus.NewCounterVec(
		prometheus.CounterOpts{
			Name: "membership_list_transport_transmit_errors_total",
			Help: "Total number of errors during transmit.",
		},
		[]string{"transport"},
	)
	ReceiveBytes = prometheus.NewCounterVec(
		prometheus.CounterOpts{
			Name: "membership_list_transport_receive_bytes_total",
			Help: "Total number of bytes received.",
		},
		[]string{"transport"},
	)
	ReceiveErrors = prometheus.NewCounterVec(
		prometheus.CounterOpts{
			Name: "membership_list_transport_receive_errors_total",
			Help: "Total number of errors during receive.",
		},
		[]string{"transport"},
	)
	Encryptions = prometheus.NewCounterVec(
		prometheus.CounterOpts{
			Name: "membership_list_transport_encryptions_total",
			Help: "Total number of encryption operations performed.",
		},
		[]string{"transport"},
	)
	Decryptions = prometheus.NewCounterVec(
		prometheus.CounterOpts{
			Name: "membership_list_transport_decryptions_total",
			Help: "Total number of decryption operations performed.",
		},
		[]string{"transport"},
	)
)

Functions

func RegisterMetrics

func RegisterMetrics(registerer prometheus.Registerer) error

RegisterMetrics registers all metrics collectors with the given prometheus registerer.

Types

type Discard

type Discard struct{}

Discard provides a client transport which discards all data and always reports success. This is useful for tests and benchmarks, when we do not want to send network messages for real.

func (*Discard) Send

func (d *Discard) Send(address encoding.Address, buffer []byte) error

type Error

type Error struct{}

Error provides a client transport which always returns an error. This is useful for tests and benchmarks, when we do not want to send network messages for real.

func (*Error) Send

func (d *Error) Send(address encoding.Address, buffer []byte) error

type Memory

type Memory struct {
	// contains filtered or unexported fields
}

Memory provides a transport implementation which moves network messages through memory between members. This is helpful for tests where you want to simulate the interaction between multiple members without having any real network involved. This transport can guarantee that network messages were delivered through its Flush calls. A normal network stack cannot provide such guarantees.

func NewMemory

func NewMemory() *Memory

NewMemory creates a new Memory instance.

func (*Memory) AddPendingSend

func (m *Memory) AddPendingSend(address encoding.Address, buffer []byte)

AddPendingSend marks the buffer for being delivered later to the given address.

func (*Memory) AddTarget

func (m *Memory) AddTarget(address encoding.Address, target Target)

AddTarget adds the target with the given address. The target will then receive network messages from clients.

func (*Memory) Addresses

func (m *Memory) Addresses() []encoding.Address

Addresses returns a list with all addresses which are registered with this instance.

func (*Memory) Client

func (m *Memory) Client() *MemoryClient

Client returns a client transport for sending network messages through the memory transport.

func (*Memory) FlushAllPendingSends

func (m *Memory) FlushAllPendingSends() error

FlushAllPendingSends flushes all messages for all registered targets.

func (*Memory) FlushPendingSends

func (m *Memory) FlushPendingSends(address encoding.Address) (bool, error)

FlushPendingSends dispatches all pending send to the target with the given address. If no target is registered for the given address, all pending sends are dropped.

type MemoryClient

type MemoryClient struct {
	// contains filtered or unexported fields
}

MemoryClient is a client which used to communicate with other targets through the memory transport.

func (*MemoryClient) Send

func (m *MemoryClient) Send(address encoding.Address, buffer []byte) error

type Store

type Store struct {
	Addresses []encoding.Address
	Buffers   [][]byte
}

Store provides a client transport which stores all data and always reports success. This is useful for tests, when we need to check if some specific data was transmitted.

func (*Store) Clear

func (s *Store) Clear()

func (*Store) Send

func (s *Store) Send(address encoding.Address, buffer []byte) error

type TCPClient

type TCPClient struct {
	// contains filtered or unexported fields
}

TCPClient provides reliable transport for sending data to a member.

TCPClient is not safe for concurrent use by multiple goroutines. Callers must serialize access to all methods. As this client is always called under the lock of the membership.List we have that serialization there.

func NewTCPClient

func NewTCPClient(key encryption.Key) (*TCPClient, error)

NewTCPClient creates a new TCPClient transport.

func (*TCPClient) Send

func (c *TCPClient) Send(address encoding.Address, buffer []byte) error

Send transmits the given buffer to the member with the given address.

type TCPServer

type TCPServer struct {
	// contains filtered or unexported fields
}

TCPServer provides reliable transport for receiving data from members.

TCPServer is safe for concurrent use by multiple goroutines. Access to shared state is internally synchronized.

func NewTCPServer

func NewTCPServer(logger logr.Logger, target Target, bindAddress string, keys []encryption.Key) (*TCPServer, error)

NewTCPServer creates a new TCPServer transport.

func (*TCPServer) Addr

func (t *TCPServer) Addr() (encoding.Address, error)

Addr returns the address the server is listening on.

func (*TCPServer) Shutdown

func (t *TCPServer) Shutdown() error

Shutdown ends the server and waits for all connections to be closed.

func (*TCPServer) Startup

func (t *TCPServer) Startup() error

Startup starts the server and listens for incoming connections.

type Target

type Target interface {
	DispatchDatagram(buffer []byte) error
}

Target is the interface which the target needs to implement for processing incoming network messages.

type Transport

type Transport interface {
	Send(address encoding.Address, buffer []byte) error
}

Transport is the interface the transport needs to implement for transmitting data between members.

type UDPClient

type UDPClient struct {
	// contains filtered or unexported fields
}

UDPClient provides unreliable transport for sending data to a member.

UDPClient is not safe for concurrent use by multiple goroutines. Callers must serialize access to all methods. As this client is always called under the lock of the membership.List we have that serialization there.

func NewUDPClient

func NewUDPClient(maxDatagramLength int, key encryption.Key) (*UDPClient, error)

NewUDPClient creates a new UDPClient transport.

func (*UDPClient) Send

func (c *UDPClient) Send(address encoding.Address, buffer []byte) error

Send transmits the given buffer to the member with the given address. The length of the buffer is validated against the max datagram length provided during construction. If the length exceeds the maximum length, no data is sent and an error is returned.

type UDPServer

type UDPServer struct {
	// contains filtered or unexported fields
}

UDPServer provides unreliable transport for receiving data from members.

UDPServer is safe for concurrent use by multiple goroutines. As the background task is reading and processing one udp message after the other, there is no special need for serialization.

func NewUDPServer

func NewUDPServer(logger logr.Logger, target Target, bindAddress string, receiveBufferLength int, keys []encryption.Key) (*UDPServer, error)

NewUDPServer creates a new UDPServer.

func (*UDPServer) Addr

func (t *UDPServer) Addr() (encoding.Address, error)

Addr returns the address the server is listening on.

func (*UDPServer) Shutdown

func (t *UDPServer) Shutdown() error

Shutdown ends the server and waits for all data to be processed.

func (*UDPServer) Startup

func (t *UDPServer) Startup() error

Startup starts the server and listens for incoming data.

type Unreliable

type Unreliable struct {
	// Transport is the transport to forward calls to.
	Transport Transport

	// Reliability is how often a send request is forwarded to the transport. Value is 0.0 to 1.0 with 0.0 meaning that
	// sends are never forwarded, 0.5 means that sends are forwarded 50% of the time and 1.0 means that sends are always
	// forwarded.
	Reliability float64
}

Unreliable provides a client transport which forwards the call to some other transport only some time. The other time the transmission is silently dropped. This helps in simulating a lossy network.

func (*Unreliable) Send

func (u *Unreliable) Send(address encoding.Address, buffer []byte) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL