#!/usr/bin/env ruby

# Juggernaut Push server

require "socket"
require "thread"
require "logger"
require "yaml"
require "monitor"

require "rubygems"
require 'eventmachine'
require "json"
require 'json/add/rails'

module Juggernaut
  RAILS_ROOT = File.join(File.dirname(__FILE__), '..')
  CONFIG = YAML::load_file(RAILS_ROOT + "/config/juggernaut_config.yml").freeze

  class Channels < Monitor

    attr_accessor :channels

    def initialize
      @channels = {}
      super
    end

    def subscribe(channel_name, connection)
      synchronize do
        channel = (@channels[channel_name] ||= Channel.new(channel_name))
        channel.subscribe(connection)
      end
    end

    def unsubscribe(channel_name, connection)
      synchronize do
        if channel = @channels[channel_name]
          channel.unsubscribe(connection)
          @channels.delete channel_name if channel.subscribers.empty?
        end
      end
    end

    def unsubscribe_all(connection)
      synchronize do
        @channels.each_pair do |name, channel|
          channel.unsubscribe(connection)
          if channel.subscribers.empty?
            @channels.delete name
          end
        end
      end
    end

    def broadcast(channel_name, data)
      channel = nil
      synchronize do
        channel = @channels[channel_name]
      end
      channel.broadcast(data) if channel
    end

    def count
      @channels.keys.size
    end

  end

  class Channel < Monitor

    attr_accessor :subscribers
    attr_reader   :name

    def initialize(name)
      @name = name
      @subscribers = []
      super()
    end

    def subscribe(connection)
      @subscribers << connection unless @subscribers.include?(connection)
    end

    def unsubscribe(connection)
      @subscribers.delete connection if @subscribers.include?(connection)
    end

    def broadcast(data)
      receivers = nil
      synchronize do
        receivers = @subscribers.dup
      end
      Logger.info  "Broadcasting to channel: #{@name} [#{receivers.size} subscribers]"
      receivers.each do |receiver|
        begin
          receiver.send_data(data)
        rescue
          Logger.warn $! ### FIXME
          unsubscribe(receiver)
        end
      end
    end
  end


  module PushServer

    def receive_data(data)
      @line_buffer << data
      if ["\0", "\n", "\r"].include?(data[-1, 1])
        dispatch_request
      else
        Logger.debug "non-complete data received"
      end
    end

    def post_init
      @@channels ||= Channels.new
      @line_buffer = ''

      peer = get_peername
      @port, @host = Socket.unpack_sockaddr_in( peer ) unless peer.nil?
      Logger.info "#{@host}:#{@port} connected."

    end

    def unbind
      Logger.info "#{@host}:#{@port} closed."
      dispatch_request
      @@channels.unsubscribe_all(self)
    end

    def dispatch_request
      Logger.debug "dispatch_request"
      request = parse_request
      Logger.debug  "Handling request: #{request.inspect}"
      return unless request
      if request['broadcast'] == 1
        broadcast(request)
      else
        subscribe(request)
      end
    end

    def broadcast(request)
      Logger.debug  "Broadcasting to #{request['channels'].join(', ')}"
      request['channels'].each do |channel_name|
        @@channels.broadcast(channel_name, request['message'] + "\0")
      end
    end

    def subscribe(request)
      return if request['channels'].nil?
      Logger.info  "Subscribing to channels #{request['channels'].join(', ')}"
      request['channels'].each do |channel_name|
        @@channels.subscribe(channel_name, self)
      end
      Logger.info "Channels: #{@@channels.count}"
    end

    def parse_request
      @line_buffer.strip!
      request = nil

      Logger.debug "@line_buffer[#{@line_buffer}]"
      unless @line_buffer.empty?
        if @line_buffer.include? "<policy-file-request/>"
          Logger.debug "Sending policy file.."
          self.send_data "<?xml version=\"1.0\"?>\n<cross-domain-policy>\n<allow-access-from domain=\"#{Juggernaut::CONFIG["ALLOW_CROSSDOMAIN"]}\" to-ports=\"*\"/>\n<allow-access-from domain=\"192.168.*\" to-ports=\"*\"/>\n</cross-domain-policy>\n\0"
        else
          begin
            request = JSON.parse(@line_buffer)
          rescue
	    Logger.warn "Unable to parse [#{@line_buffer}]"
            request = nil
          end
        end
        @line_buffer = ''
      end
      request
    end
  end


  class Logger
   def self.debug msg
#     puts Time.now.strftime("%m/%d/%y %H:%M:%S ") + "DEBUG #{msg}"
    $LOG.debug msg
   end

   def self.info msg
     puts Time.now.strftime("%m/%d/%y %H:%M:%S ") + "INFO  #{msg}"
     $LOG.info msg
   end

   def self.warn msg
     puts Time.now.strftime("%m/%d/%y %H:%M:%S ") + "WARN  #{msg.capitalize}"
     $LOG.warn msg
   end

  end


end

$LOG = Logger.new("log/push_server.log", 2, 10*1024)
@hostname = Juggernaut::CONFIG["PUSH_HOST"]
@port = Juggernaut::CONFIG["PUSH_PORT"]
Juggernaut::Logger.info("Starting up on #{@hostname}:#{@port}...")

EventMachine::run {
  EventMachine::start_server @hostname, @port.to_i, Juggernaut::PushServer
}
