Documentation
¶
Index ¶
- Constants
- Variables
- func GetDigest(r io.Reader, digestAlgorithm DigestAlgorithm) (string, error)
- func GetNextIP(availableIPs *availableIPs) net.IP
- func IsDigestSupported(algorithm string) bool
- func WithFeedbackChannel(ctx context.Context, feedbackChan chan struct{}) context.Context
- func WithWrappedConnection(ctx context.Context, wrappedConnChan chan *CustomConnection) context.Context
- type CustomConnection
- type CustomHTTPClient
- type DedupeOptions
- type DigestAlgorithm
- type DiscardHook
- type DiscardHookError
- type Error
- type GzipReaderInterface
- type GzipWriterInterface
- type HTTPClientSettings
- type Header
- type ReadOpts
- type Reader
- type Record
- type RecordBatch
- type RotatorSettings
- type WaitGroupWithCount
- type Writer
Constants ¶
const ( // ContextKeyFeedback is the context key for the feedback channel. // When provided, the channel will receive a signal once the WARC record // has been written to disk, making WARC writing synchronous. // Use WithFeedbackChannel() helper function for convenience. ContextKeyFeedback contextKey = "feedback" // ContextKeyWrappedConn is the context key for the wrapped connection channel. // This is used internally to retrieve the wrapped connection for advanced use cases. // Use WithWrappedConnection() helper function for convenience. ContextKeyWrappedConn contextKey = "wrappedConn" )
Variables ¶
var ( IPv6 *availableIPs IPv4 *availableIPs )
var ( // Create a couple of counters for tracking various stats DataTotal atomic.Int64 CDXDedupeTotalBytes atomic.Int64 DoppelgangerDedupeTotalBytes atomic.Int64 LocalDedupeTotalBytes atomic.Int64 CDXDedupeTotal atomic.Int64 DoppelgangerDedupeTotal atomic.Int64 LocalDedupeTotal atomic.Int64 )
var DedupeHTTPClient = http.Client{ Timeout: 10 * time.Second, Transport: &http.Transport{ Dial: (&net.Dialer{ Timeout: 5 * time.Second, }).Dial, TLSHandshakeTimeout: 5 * time.Second, }, }
TODO: Add stats on how long dedupe HTTP requests take
var ErrUnknownDigestAlgorithm = errors.New("unknown digest algorithm")
Functions ¶
func GetDigest ¶ added in v0.8.86
func GetDigest(r io.Reader, digestAlgorithm DigestAlgorithm) (string, error)
func IsDigestSupported ¶ added in v0.8.86
func WithFeedbackChannel ¶ added in v0.8.92
WithFeedbackChannel adds a feedback channel to the request context. When provided, the channel will receive a signal once the WARC record has been written to disk, making WARC writing synchronous. Without this, WARC writing is asynchronous.
Example:
feedbackChan := make(chan struct{}, 1)
req = req.WithContext(warc.WithFeedbackChannel(req.Context(), feedbackChan))
// ... perform request ...
<-feedbackChan // blocks until WARC is written
func WithWrappedConnection ¶ added in v0.8.92
func WithWrappedConnection(ctx context.Context, wrappedConnChan chan *CustomConnection) context.Context
WithWrappedConnection adds a wrapped connection channel to the request context. This is used for advanced use cases where direct access to the connection is needed.
Types ¶
type CustomConnection ¶ added in v0.8.82
type CustomConnection struct {
net.Conn
io.Reader
io.Writer
sync.WaitGroup
// contains filtered or unexported fields
}
func (*CustomConnection) Close ¶ added in v0.8.82
func (cc *CustomConnection) Close() error
func (*CustomConnection) CloseWithError ¶ added in v0.8.82
func (cc *CustomConnection) CloseWithError(err error) error
type CustomHTTPClient ¶
type CustomHTTPClient struct {
WaitGroup *WaitGroupWithCount
ErrChan chan *Error
WARCWriter chan *RecordBatch
http.Client
TempDir string
WARCWriterDoneChannels []chan bool
DiscardHook DiscardHook
TLSHandshakeTimeout time.Duration
ConnReadDeadline time.Duration
MaxReadBeforeTruncate int
FullOnDisk bool
DigestAlgorithm DigestAlgorithm
// MaxRAMUsageFraction is the fraction of system RAM above which we'll force spooling to disk. For example, 0.5 = 50%.
// If set to <= 0, the default value is DefaultMaxRAMUsageFraction.
MaxRAMUsageFraction float64
DataTotal *atomic.Int64
CDXDedupeTotalBytes *atomic.Int64
DoppelgangerDedupeTotalBytes *atomic.Int64
LocalDedupeTotalBytes *atomic.Int64
CDXDedupeTotal *atomic.Int64
DoppelgangerDedupeTotal *atomic.Int64
LocalDedupeTotal *atomic.Int64
// contains filtered or unexported fields
}
func NewWARCWritingHTTPClient ¶
func NewWARCWritingHTTPClient(HTTPClientSettings HTTPClientSettings) (httpClient *CustomHTTPClient, err error)
func (*CustomHTTPClient) Close ¶
func (c *CustomHTTPClient) Close() error
func (*CustomHTTPClient) WriteRecord ¶
func (c *CustomHTTPClient) WriteRecord(WARCTargetURI, WARCType, contentType, payloadString string, payloadReader io.Reader)
type DedupeOptions ¶
type DigestAlgorithm ¶ added in v0.8.86
type DigestAlgorithm int
const ( SHA1 DigestAlgorithm = iota // According to IIPC, lowercase base 16 is the "popular" encoding for SHA256 SHA256Base16 SHA256Base32 BLAKE3 )
func GetDigestFromPrefix ¶ added in v0.8.86
func GetDigestFromPrefix(prefix string) DigestAlgorithm
type DiscardHook ¶
DiscardHook is a hook function that is called for each response. (if set) It can be used to determine if the response should be discarded. Returns:
- bool: should the response be discarded
- string: (optional) why the response was discarded or not
type DiscardHookError ¶
type DiscardHookError struct {
URL string
Reason string // reason for discarding
Err error // nil: discarded successfully
}
func (*DiscardHookError) Error ¶
func (e *DiscardHookError) Error() string
func (*DiscardHookError) Unwrap ¶
func (e *DiscardHookError) Unwrap() error
type GzipReaderInterface ¶ added in v0.8.90
type GzipReaderInterface interface {
io.ReadCloser
Multistream(enable bool)
Reset(r io.Reader) error
}
GzipReaderInterface defines the interface for gzip readers This allows us to switch between standard gzip and klauspost gzip based on build tags
type GzipWriterInterface ¶ added in v0.8.90
type GzipWriterInterface interface {
io.WriteCloser
Flush() error
}
GzipWriterInterface defines the interface for gzip writers This allows us to switch between standard gzip and klauspost gzip based on build tags
type HTTPClientSettings ¶
type HTTPClientSettings struct {
RotatorSettings *RotatorSettings
Proxy string
TempDir string
DiscardHook DiscardHook
DNSServers []string
DedupeOptions DedupeOptions
DialTimeout time.Duration
ResponseHeaderTimeout time.Duration
DNSResolutionTimeout time.Duration
DNSRecordsTTL time.Duration
DNSCacheSize int
DNSConcurrency int
TLSHandshakeTimeout time.Duration
ConnReadDeadline time.Duration
MaxReadBeforeTruncate int
DecompressBody bool
FollowRedirects bool
FullOnDisk bool
MaxRAMUsageFraction float64
VerifyCerts bool
RandomLocalIP bool
DisableIPv4 bool
DisableIPv6 bool
IPv6AnyIP bool
DigestAlgorithm DigestAlgorithm
}
type Header ¶
Header provides information about the WARC record. It stores WARC record field names and their values. Since WARC field names are case-insensitive, the Header methods are case-insensitive as well.
func (Header) Del ¶
Del deletes the value associated with key. The key is compared in a case-insensitive manner.
type Reader ¶
type Reader struct {
// contains filtered or unexported fields
}
Reader stores the bufio.Reader and gzip.Reader for a WARC file
type Record ¶
type Record struct {
Header Header
Content spooledtempfile.ReadWriteSeekCloser
Version string // WARC/1.0, WARC/1.1 ...
Offset int64 // Offset of the record start (-1 if WARC file type is not supported yet)
Size int64 // COMPRESSED size of the record (gzip member): header + deflate data + trailer. (-1 if WARC file type is not supported yet)
}
Record represents a WARC record.
type RecordBatch ¶
RecordBatch is a structure that contains a bunch of records to be written at the same time, and a common capture timestamp. FeedbackChan is used to signal when the records have been written.
func NewRecordBatch ¶
func NewRecordBatch(feedbackChan chan struct{}) *RecordBatch
NewRecordBatch creates a record batch, it also initialize the capture time.
type RotatorSettings ¶
type RotatorSettings struct {
// Content of the warcinfo record that will be written
// to all WARC files
WarcinfoContent Header
// Prefix used for WARC filenames, WARC 1.1 specifications
// recommend to name files this way:
// Prefix-Timestamp-Serial-Crawlhost.warc.gz
Prefix string
// Compression algorithm to use
Compression string
// Path to a ZSTD compression dictionary to embed (and use) in .warc.zst files
CompressionDictionary string
// Directory where the created WARC files will be stored,
// default will be the current directory
OutputDirectory string
// WARCSize is in Megabytes
WARCSize float64
// WARCWriterPoolSize defines the number of parallel WARC writers
WARCWriterPoolSize int
// contains filtered or unexported fields
}
RotatorSettings is used to store the settings needed by recordWriter to write WARC files
func NewRotatorSettings ¶
func NewRotatorSettings() *RotatorSettings
NewRotatorSettings creates a RotatorSettings structure and initialize it with default values
func (*RotatorSettings) NewWARCRotator ¶
func (s *RotatorSettings) NewWARCRotator() (recordWriterChan chan *RecordBatch, doneChannels []chan bool, err error)
NewWARCRotator creates and return a channel that can be used to communicate records to be written to WARC files to the recordWriter function running in a goroutine
type WaitGroupWithCount ¶
func (*WaitGroupWithCount) Add ¶
func (wg *WaitGroupWithCount) Add(delta int)
func (*WaitGroupWithCount) Done ¶
func (wg *WaitGroupWithCount) Done()
func (*WaitGroupWithCount) Size ¶
func (wg *WaitGroupWithCount) Size() int
type Writer ¶
type Writer struct {
GZIPWriter GzipWriterInterface
ZSTDWriter *zstd.Encoder
FileWriter *bufio.Writer
FileName string
Compression string
DigestAlgorithm DigestAlgorithm
ParallelGZIP bool
}
Writer writes WARC records to WARC files.
func NewWriter ¶
func NewWriter(writer io.Writer, fileName string, digestAlgorithm DigestAlgorithm, compression string, contentLengthHeader string, newFileCreation bool, dictionary []byte) (*Writer, error)
NewWriter creates a new WARC writer.
func (*Writer) CloseCompressedWriter ¶
func (*Writer) WriteInfoRecord ¶
WriteInfoRecord method can be used to write informations record to the WARC file
func (*Writer) WriteRecord ¶
WriteRecord writes a record to the underlying WARC file. A record consists of a version string, the record header followed by a record content block and two newlines:
Version CLRF Header-Key: Header-Value CLRF CLRF Content CLRF CLRF