#!/usr/bin/env python
# TODO: (FEAT) add graphite - https://github.com/daniellawrence/graphitesend
# TODO: https://github.com/globocom/tornado-es
import pika
import logging
import os
from abc import abstractmethod, ABCMeta
from datetime import date
from time import sleep
# TRANSPORT SPECIFIC
from logentries import LogentriesHandler
import pymongo
from elasticsearch import Elasticsearch
from influxdb import client as influxdb
import urllib2
# import pretty table for statistical data visualization
from prettytable import PrettyTable
# FILE DEFAULTS
DEFAULT_FILE_PATH = 'generated.log'
DEFAULT_MAXBYTES = 10000000
DEFAULT_BACKUPS = 20
# AMQP DEFAULTS
# queue name for packager events
DEFAULT_AMQP_QUEUE = ''
# broker exchange
DEFAULT_AMQP_EXCHANGE = ''
DEFAULT_AMQP_DELIVERY_MODE = 2
# ES DEFAULTS
DEFAULT_ES_PORT = '9200'
DEFAULT_ES_URL_PREFIX = ''
DEFAULT_ES_TIMEOUT = '10'
d = date.today()
DEFAULT_ES_INDEX = 'logstash-{0}.{1}.{2}'.format(d.year, d.month, d.day)
DEFAULT_ES_DOC_TYPE = 'doc'
DEFAULT_ES_SLEEP = 3
# LOGGLY DEFAULTS
DEFAULT_LOGGLY_DOMAIN = 'logs-01.loggly.com'
# MONGO DEFAULTS
DEFAULT_MONGO_PORT = 27017
DEFAULT_MONGO_DB = 'test'
DEFAULT_MONGO_COLLECTION = 'my_collection'
DEFAULT_MONGO_SLEEP = 1
# INFLUX DEFAULS
DEFAULT_INFLUX_PORT = 8086
DEFAULT_INFLUX_DB = 'metrics'
DEFAULT_INFLUX_USER = 'root'
DEFAULT_INFLUX_PASSWORD = 'root'
[docs]class BaseTransport(object):
__metaclass__ = ABCMeta
@abstractmethod
def __init__(self, config):
return
@abstractmethod
@abstractmethod
[docs] def send(self, client, log):
return
[docs]class FileTransport(BaseTransport):
"""a RotatingFileHandler transport implementation"""
def __init__(self, config):
self.file_path = config.get('file', DEFAULT_FILE_PATH)
self.max_bytes = config.get('max_bytes', DEFAULT_MAXBYTES)
self.backups_count = config.get('backups_count', DEFAULT_BACKUPS)
[docs] def send(self, client, data):
for piece in data:
client.info(piece)
[docs] def get_data(self):
# https://code.google.com/p/prettytable/wiki/Tutorial
# you can also get data from csv's, htmls, etc...
# so for instance, you could query elasticsearch right here,
# get the data, and write it here, so that you can make sure
# all logs you wrote actually made it through.
data_table = PrettyTable(["Type", "Value"])
# make sure the "type" field is always aligned to the left.
data_table.align["Type"] = "l"
# ...
data_table.padding_width = 1
with open(self.file_path) as f:
# populate the table
# TODO: (BUG) fix bug where rotating files will return a bad line
# TODO: (BUG) count if more than one file is generated
data_table.add_row(["lines written", sum(1 for _ in f)])
return data_table
[docs]class AMQPTransport(BaseTransport):
"""an amqp transport implementation"""
def __init__(self, config):
try:
self.host = config['host']
except KeyError as ex:
raise RuntimeError('configuration incomplete: {0}'.format(ex))
self.queue = config.get('queue', DEFAULT_AMQP_QUEUE)
self.exchange = config.get('exchange', DEFAULT_AMQP_EXCHANGE)
self.exchange_type = config.get('exchange_type', 'fanout')
self.routing_key = config.get('routing_key', self.queue)
self.delivery_mode = config.get('deliver_mode',
DEFAULT_AMQP_DELIVERY_MODE)
self.auto_delete = config.get('auto_delete', True)
self.durable = config.get('durable', True)
self.exclusive = config.get('exclusive', False)
[docs] def send(self, client, data):
for piece in data:
client.basic_publish(exchange=self.exchange,
routing_key=self.routing_key,
body=piece,
properties=pika.BasicProperties(
delivery_mode=self.delivery_mode))
# TODO: (FEAT) support connection closing
[docs] def close(self):
self.connection.close()
[docs]class UDPTransport(BaseTransport):
"""a udp transport implementation"""
def __init__(self, config):
try:
self.host = config['host']
self.port = config['port']
except KeyError as ex:
raise RuntimeError('configuration not complete: {0}'.format(ex))
[docs] def send(self, client, data):
for piece in data:
client.debug(piece)
[docs]class StreamTransport(BaseTransport):
"""a shell stream transport implementation"""
def __init__(self, config):
pass
[docs] def send(self, client, data):
for piece in data:
client.debug(piece)
[docs]class ElasticsearchTransport(BaseTransport):
"""an Elasticsearch transport implementation"""
def __init__(self, config):
try:
self.host = config.get('host')
self.port = config.get('port', DEFAULT_ES_PORT)
self.url_prefix = config.get('url_prefix', DEFAULT_ES_URL_PREFIX)
self.timeout = config.get('timeout', DEFAULT_ES_TIMEOUT)
self.index = config.get('index', DEFAULT_ES_INDEX)
self.doc_type = config.get('doc_type', DEFAULT_ES_DOC_TYPE)
self.sleep = config.get('sleep', DEFAULT_ES_SLEEP)
except KeyError as ex:
raise RuntimeError('configuration not complete: {0}'.format(ex))
[docs] def send(self, client, data):
for piece in data:
client.index(
index=self.index,
doc_type=self.doc_type,
body=piece
)
[docs] def get_data(self):
sleep(self.sleep)
data_table = PrettyTable(["Type", "Value"])
data_table.align["Type"] = "l"
data_table.padding_width = 1
self.current_docs = self.indices_client.stats(
index=self.index)['_all']['total']['docs']['count']
data_table.add_row(["docs before", self.docs])
data_table.add_row(["docs after", self.current_docs])
data_table.add_row(["docs written", self.current_docs - self.docs])
return data_table
[docs]class LogentriesTransport(BaseTransport):
"""a logentries transport implementation"""
def __init__(self, config):
try:
self.token = config['token']
except KeyError as ex:
raise RuntimeError('configuration not complete: {0}'.format(ex))
[docs] def send(self, client, data):
for piece in data:
client.debug(piece)
[docs]class LogglyTransport(BaseTransport):
"""a Loggly transport implementation"""
# TODO: (IMPRV) check out https://github.com/kennedyj/loggly-handler/
def __init__(self, config):
try:
self.domain = config.get('url', DEFAULT_LOGGLY_DOMAIN)
self.token = config['token']
except KeyError as ex:
raise RuntimeError('configuration not complete: {0}'.format(ex))
[docs] def send(self, client, data):
for piece in data:
piece = "PLAINTEXT=" + urllib2.quote(piece)
urllib2.urlopen(client, piece)
[docs]class MongoDBTransport(BaseTransport):
"""a MongoDB transport implementation"""
def __init__(self, config):
try:
self.host = config['host']
self.port = config.get('port', DEFAULT_MONGO_PORT)
self.db = config.get('db', DEFAULT_MONGO_DB)
self.collection = config.get(
'collection', DEFAULT_MONGO_COLLECTION)
self.sleep = config.get('sleep', DEFAULT_MONGO_SLEEP)
except KeyError as ex:
raise RuntimeError('configuration not complete: {0}'.format(ex))
[docs] def send(self, client, data):
for piece in data:
client.save(piece)
[docs] def get_data(self):
sleep(self.sleep)
data_table = PrettyTable(["Type", "Value"])
data_table.align["Type"] = "l"
data_table.padding_width = 1
current_docs = self.collection_client.count()
data_table.add_row(["docs before", self.docs])
data_table.add_row(["docs after", current_docs])
data_table.add_row(["docs written", current_docs - self.docs])
return data_table
[docs]class InfluxDBTransport(BaseTransport):
"""an InfluxDB transport implementation"""
def __init__(self, config):
try:
self.host = config['host']
self.port = config.get('port', DEFAULT_INFLUX_PORT)
self.username = config.get('username', DEFAULT_INFLUX_USER)
self.password = config.get('password', DEFAULT_INFLUX_PASSWORD)
self.database = config.get('database', DEFAULT_INFLUX_DB)
except KeyError as ex:
raise RuntimeError('configuration not complete: {0}'.format(
ex.message))
[docs] def send(self, client, data):
for piece in data:
client.write_points([piece])
[docs] def get_data(self):
pass