class Client

Header File: <libkafka_asio/client.h>

Namespace: libkafka_asio

The client class does the actual interaction with a Kafka server. Use it to connect to such a server and send asynchronous requests to it. The client uses Boost Asio for the TCP-based communication.

Client

Client(boost::asio::io_service& io_service, 
       const Configuration& configuration)

Constructs a new client object. All communication to the Kafka server will be scheduled on the given io_service object.

~Client

~Client()

A possible open connection will be closed on destruction of the client object. All pending asynchronous operations will be cancelled and the respective handler functions will be called with an operation_aborted error.

Member Functions

AsyncConnect (overload 1 of 2)

void AsyncConnect(const std::string& host,
                  const std::string& service,
                  const ConnectionHandlerType& handler)

Asynchronously connects to the Kafka server, identified by the given hostname and service (port or service string, e.g. see /etc/services under Linux). The given handler function object will be called on success as well as on error. The function always returns immediately.

The signature of the handler function must be:

void handler(
    const Client::ErrorCodeType& error
);

Example:

boost::asio::io_service ios;
Client cl(ios);
cl.AsyncConnect("localhost", "9092", [](const Client::ErrorCodeType& error) {
    if (error) {
        std::cerr << "Failed to connect!" << std::endl;
        return;
    }
    std::cout << "Connected!" << std::endl;
});

AsyncConnect (overload 2 of 2)

void AsyncConnect(const ConnectionHandlerType& handler)

Tries to connect to the brokers, specified in the configuration given to this client object. If no such broker address was configured, the handler function will be scheduled with ErrorNoBroker. Connection attempts will be made in the sequence, the broker addresses were added to the configuration. The function always returns immediately.

The signature of the handler function must be:

void handler(
    const Client::ErrorCodeType& error
);

Example:

boost::asio::io_service ios;
Client::Configuration conf;
conf.auto_connect = true;
conf.AddBrokerFromString("localhost:9092");
conf.AddBrokerFromString("example.org:9092");
Client cl(ios);
cl.AsyncConnect([](const Client::ErrorCodeType& error) {
    if (error) {
        std::cerr << "Failed to connect!" << std::endl;
        return;
    }
    std::cout << "Connected!" << std::endl;
});

AsyncRequest

template<typename TRequest>
void AsyncRequest (const TRequest& request,
                   const typename Handler<TRequest>::Type& handler)

Asynchronously sends the given request to the connected Kafka server. The given handler function object will be called on success as well as on error condition.

If this client object is not in connected state, the handler function will be scheduled with ErrorNotConnected. If the auto-connect option was enabled in the configuration, this function will try to connect to one of the brokers, specified in the configuration (See function AsyncConnect(handler)).

The function always returns immediately. The signature of the handler function must be:

void handler(
    const Client::ErrorCodeType& error,
    const Response::OptionalType& response
);

The type of the optional response object, handed to the handler function, depends on the given request type. It will be determined using the response type specified in the request type (e.g. FetchRequest::ResponseType).

Example:

boost::asio::io_service ios;
Client::Configuration conf;
conf.auto_connect = true;
conf.AddBrokerFromString("localhost:9092");
conf.AddBrokerFromString("example.org:9092");
Client cl(ios);
MetadataRequest request;
client.AsyncRequest(request, [](const Client::ErrorCodeType& error,
                                const MetadataResponse::OptionalType& response) {
    if (error) {
        std::cerr << "Error!" << std::endl;
        return;
    }
    std::cout << "Got Metadata!" << std::endl;
});

Close

void Close()

Closes the connection to the Kafka server. All asynchronous operations will be cancelled immediately with an operation_aborted error.

Types

Configuration

typedef ClientConfiguration Configuration

Client configuration type.

ErrorCodeType

typedef boost::system::error_code ErrorCodeType

Error code type.

ConnectionHandlerType

typedef boost::function<void(const ErrorCodeType&)> ConnectionHandlerType

Handler type for connection attempts.

Handler<TRequest>::Type

template<typename TRequest> Handler<TRequest>::Type

Handler type for asynchronous requests.