class Connection

Header File: <libkafka_asio/connection.h>

Namespace: libkafka_asio

The connection class does the actual interaction with a Kafka server. Use it to connect to such a server and send asynchronous requests to it. The connection uses Boost Asio for the TCP-based communication.

Constructor / Destructor

Connection

Connection(boost::asio::io_service& io_service,
           const Configuration& configuration)

Constructs a new connection object. All communication to the Kafka server will be scheduled on the given io_service object.

~Connection

~Connection()

A possible open connection will be closed on destruction of the connection object. All pending asynchronous operations will be cancelled and the respective handler functions will be called with an operation_aborted error.

Member Functions

AsyncConnect (overload 1 of 2)

void AsyncConnect(const std::string& host,
                  const std::string& service,
                  const ConnectionHandlerType& handler)

Asynchronously connects to the Kafka server, identified by the given hostname and service (port or service string, e.g. see /etc/services under Linux). The given handler function object will be called on success as well as on error. The function always returns immediately.

The signature of the handler function must be:

void handler(
    const Connection::ErrorCodeType& error
);

Example:

boost::asio::io_service ios;
Connection cl(ios);
cl.AsyncConnect("localhost", "9092", [](const Connection::ErrorCodeType& error) {
    if (error) {
        std::cerr << "Failed to connect!" << std::endl;
        return;
    }
    std::cout << "Connected!" << std::endl;
});

AsyncConnect (overload 2 of 2)

void AsyncConnect(const ConnectionHandlerType& handler)

Tries to connect to the brokers, specified in the configuration given to this connection object. If no such broker address was configured, the handler function will be scheduled with ErrorNoBroker. Connection attempts will be made in the sequence, the broker addresses were added to the configuration. The function always returns immediately.

The signature of the handler function must be:

void handler(
    const Connection::ErrorCodeType& error
);

Example:

boost::asio::io_service ios;
Connection::Configuration conf;
conf.auto_connect = true;
conf.AddBrokerFromString("localhost:9092");
conf.AddBrokerFromString("example.org:9092");
Connection cl(ios);
cl.AsyncConnect([](const Connection::ErrorCodeType& error) {
    if (error) {
        std::cerr << "Failed to connect!" << std::endl;
        return;
    }
    std::cout << "Connected!" << std::endl;
});

AsyncRequest

template<typename TRequest>
void AsyncRequest (const TRequest& request,
                   const typename Handler<TRequest>::Type& handler)

Asynchronously sends the given request to the connected Kafka server. The given handler function object will be called on success as well as on error condition.

If this connection object is not in connected state, the handler function will be scheduled with ErrorNotConnected. If the auto-connect option was enabled in the configuration, this function will try to connect to one of the brokers, specified in the configuration (See function AsyncConnect(handler)).

The function always returns immediately. The signature of the handler function must be:

void handler(
    const Connection::ErrorCodeType& error,
    const Response::OptionalType& response
);

The type of the optional response object, handed to the handler function, depends on the given request type. It will be determined using the response type specified in the request type (e.g. FetchRequest::ResponseType).

Example:

boost::asio::io_service ios;
Connection::Configuration conf;
conf.auto_connect = true;
conf.AddBrokerFromString("localhost:9092");
conf.AddBrokerFromString("example.org:9092");
Connection cl(ios);
MetadataRequest request;
connection.AsyncRequest(request, [](const Connection::ErrorCodeType& error,
                                const MetadataResponse::OptionalType& response) {
    if (error) {
        std::cerr << "Error!" << std::endl;
        return;
    }
    std::cout << "Got Metadata!" << std::endl;
});

Close

void Close()

Closes the connection to the Kafka server. All asynchronous operations will be cancelled immediately with an operation_aborted error.

Types

Configuration

typedef ConnectionConfiguration Configuration

Connection configuration type.

ErrorCodeType

typedef boost::system::error_code ErrorCodeType

Error code type.

ConnectionHandlerType

typedef boost::function<void(const ErrorCodeType&)> ConnectionHandlerType

Handler type for connection attempts.

Handler<TRequest>::Type

template<typename TRequest> Handler<TRequest>::Type

Handler type for asynchronous requests.