LogStash-Cluster

From James Dooley's Wiki
Jump to: navigation, search


There are several different aspects to a good LogStash cluster. The number depends on the exact setup but the basic parts are:

Message Queue (RabbitMQ) Indexer (LogStash) Storage (Elastic Search)

Optionally you can also use GrayLog2 which also adds MongoDB to the mix.

The general flow will be either:

Shipper -> RabbitMQ -> LogStash -> ElasticSearch

or

Shipper -> RabbitMQ -> LogStash -> GrayLog2 -> ElasticSearch with MongoDB being used for stats

The nice thing about either setup is that every part can be clustered to provide a HA type solution. Full HA will require a few tweaks, and a rebuild of LogStash from source.

RabbitMQ Cluster

RabbitMQ is the first hop in the system. It functions as a collection and temporary storage location for all incoming messages.

By default LogStash does not expect for RabbitMQ to support any HA services. In order to use HA queues you need to make some small changes to LogStash's source and rebuild. Either you can do the full build using GIT, or since it is a small change to ruby code you can get the .jar file and edit the contents.

The main change is that we need to be able to tell RabbitMQ when we are creating the queue that it needs to be HA. Otherwise the queue will be owned by a single server and will not be replicated.

RabbitMQ Configuration

By default RabbitMQ does not need a configuration file, it has all the default settings you need to create a basic single server setup or a cluster of servers. If you really want to configure RabbitMQ manually you can create a file /etc/rabbitmq/rabbitmq.config.

The only change you need to make by default to get clustering to work is to copy the erlang cookie between all of the clustered servers. If they have the same cookie they will talk with each other. This communication does happen over multicast, so if multicast is not allowed between the servers you will need to configure the nodes manually.

Most likely you will want to set up a floating vip between the servers so shipper agents can talk to a single server and do not need to be reconfigured when a server is taken offline. This can be easily done using heartbeat or pacemaker.

LogStash Shipper to RabbitMQ

LogStash does work well out of the box with RabbitMQ even in a HA setup, this is because the exchanges themselves are not HA and as long as there is an active queue all messages will be sent directly to the queue.

output {
  amqp {
    host => "10.0.123.30"
    exchange_type => "fanout"
    name => "rawlogs"
  }
}

Note in my setup 10.0.123.30 is the floating VIP for my RabbitMQ cluster.

name => "rawlogs" indicates the exchange that it is going to be sending to. rawlogs then forwards to the rawlogs_consumer queue where the indexers grab the messages.

If you are working on a setup with multiple clusters you may want to change the exchange name to keep customer information separated.

LogStash Indexer to RabbitMQ

LogStash by default does not work well in a HA setup with RabbitMQ. After patching the system the following configuration should work to ensure that messages are not lost or duplicated:

input {
 amqp {
 type => "all"
#    host => "127.0.0.1"
 host => "10.0.123.30"
 exchange => "rawlogs"
 name => "rawlogs_consumer"
 durable => "true"
 exclusive => "false"
 auto_delete => "false"
 arguments => [ "x-ha-policy", "all" ]
 }
}

Host is the IP for RabbitMQ, I left in 127.0.0.1 to show that you can run RabbitMQ and the indexer on the same box.

The only addition for HA was the arguments. We need to set the x-ha-policy and tell it to mirror the data to all RabbitMQ nodes. This setting does require a patch to work:

@@ -14,6 +14,9 @@ class LogStash::Inputs::Amqp < LogStash::Inputs::Base
   config_name "amqp"
   plugin_status "unstable"
 
+  # Your amqp broker's custom arguments. For mirrored queues in RabbitMQ: [ "x-ha-policy", "all" ]
+  config :arguments, :validate => :array, :default => []
+
   # Your amqp server address
   config :host, :validate => :string, :required => true
 
@@ -26,7 +29,7 @@ class LogStash::Inputs::Amqp < LogStash::Inputs::Base
   # Your amqp password
   config :password, :validate => :password, :default => "guest"
 
-  # The name of the queue. 
+  # The name of the queue.
   config :name, :validate => :string, :default => ''
 
   # The name of the exchange to bind the queue.
@@ -108,7 +111,9 @@ def run(queue)
       @bunny.start
       @bunny.qos({:prefetch_count => @prefetch_count})
 
-      @queue = @bunny.queue(@name, {:durable => @durable, :auto_delete => @auto_delete, :exclusive => @exclusive})
+      @arguments_hash = Hash[*@arguments]
+
+      @queue = @bunny.queue(@name, {:durable => @durable, :auto_delete => @auto_delete, :exclusive => @exclusive, :arguments => @arguments_hash })
       @queue.bind(@exchange, :key => @key)
       @queue.subscribe({:ack => @ack}) do |data|