/* Mantra - http://www.neuromancy.net/mantra * (c) 2007-2009 - The Neuromancy Society * * This code is released under the Artistic License 2.0. If you have not * received a copy of the license with this source code, you may find * this license at: * http://www.opensource.org/licenses/artistic-license-2.0.php * * $Id: hello_world_task.cpp 618 2010-03-11 13:23:03Z prez $ */ #include #include #include #include #include #include #include #include #include #include #include #include using namespace mantra; using namespace mantra::scheduler; class hello_world : public active_task, public passive_task { public: hello_world(const std::string &name, const std::string &recipient, const std::string &destination); void kickoff() { int key = (rand() % 8); boost::shared_ptr m = create_message(); m->routing()->set_destination(destination_); m->routing()->set_recipient(recipient_); (*m->meta())["KEY"] = key; boost::posix_time::ptime pt = boost::posix_time::microsec_clock::universal_time(); (*m->payload()) = pt; try { mantra::scheduler::scheduler::instance().route(m, true); } catch (std::exception &e) { fprintf(stderr, "EXCEPTION: %s\n", e.what()); } } void operator()(const boost::function &check) { srand(time(NULL)); printf("THREAD START ... (%d)\n", check()); while (check()) { printf("Still Going ...\n"); kickoff(); sleep(1); } printf("THREAD STOP ...\n"); } void handle(const boost::shared_ptr &msg, bool block) { boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time(); boost::posix_time::ptime pt = boost::any_cast(*msg->payload()); printf("Received message (%d: %s) [%s]:\n", boost::get((*msg->meta())["KEY"]), boost::lexical_cast(msg->id()).c_str(), boost::lexical_cast(mantra::duration(pt, now)).c_str()); for (size_t i = 0; i < msg->routing()->hops().size(); ++i) printf(" %s (%d)\n", msg->routing()->hops()[i].name.c_str(), msg->routing()->hops()[i].task); } private: std::string recipient_; std::string destination_; }; class delay : public passive_task { unsigned int wait_; public: delay(const std::string &name, unsigned int wait) : task(name), passive_task(name), wait_(wait) { } void handle(const boost::shared_ptr &msg, bool block) { msg->status("DELAYED!"); boost::posix_time::ptime pt = boost::posix_time::microsec_clock::universal_time() + boost::posix_time::seconds(wait_); message::pred_guard pg(msg->guard_interrupt()); boost::mutex mtx; boost::mutex::scoped_lock sl(mtx); boost::condition cond; cond.timed_wait(sl, pt); mantra::scheduler::scheduler::instance().route(msg, true); } }; class except : public passive_task { public: except(const std::string &name) : task(name), passive_task(name) { } void handle(const boost::shared_ptr &msg, bool block) { throw std::runtime_error("test exeption!"); } }; class command_hw_event : public mantra::admin::command { hello_world &hw; public: command_hw_event(hello_world &h) : command(INTL_N("Schedule an event"), "mantra"), hw(h) {} mantra::admin::command::return_type operator()( const boost::shared_ptr &sess, std::vector &prev, std::string &rest, const boost::shared_ptr &ctx = boost::shared_ptr()) { MT_FB(sess << prev << rest << ctx); mantra::admin::command::return_type rv = std::make_pair(mantra::admin::command::no_error, boost::shared_ptr()); boost::shared_ptr e = mantra::scheduler::scheduler::instance().get_task("EVENT"); if (!e) { sess->err(INTL("Task EVENT is not a known task.")); MT_RET(rv); } mantra::scheduler::event_task *et = dynamic_cast(e.get()); if (!et) { sess->err(INTL("Task EVENT is not an event task.")); MT_RET(rv); } try { long seconds = boost::lexical_cast(rest); unsigned int id = et->schedule("Hello World event", boost::bind(&hello_world::kickoff, &hw), mantra::duration(boost::posix_time::seconds(seconds))); if (id) sess->out(boost::format(INTL("Event scheduled (%d).")) % id); else sess->err(INTL("Error scheduling event.")); } catch (boost::bad_lexical_cast &) { sess->err(INTL("Number of seconds for schedule required.")); } MT_FE(rv); } }; class command_hw_direct : public mantra::admin::command { hello_world &hw; public: command_hw_direct(hello_world &h) : command(INTL_N("Inject an event"), "mantra"), hw(h) {} mantra::admin::command::return_type operator()( const boost::shared_ptr &sess, std::vector &prev, std::string &rest, const boost::shared_ptr &ctx = boost::shared_ptr()) { MT_FB(sess << prev << rest << ctx); mantra::admin::command::return_type rv = std::make_pair(mantra::admin::command::no_error, boost::shared_ptr()); hw.kickoff(); sess->out(INTL("Event injected.")); MT_FE(rv); } }; inline hello_world::hello_world(const std::string &name, const std::string &recipient, const std::string &destination) : task(name), active_task(name), passive_task(name), recipient_(recipient), destination_(destination) { selector->add("EVENT", "event", boost::shared_ptr(new command_hw_event(*this))); selector->add("(DIRECT|INJECT)", "inject", boost::shared_ptr(new command_hw_direct(*this))); } static task *create_hello_world(const boost::property_tree::iptree &cfg) { std::string name = cfg.get(".name", cfg.get("name", std::string())); if (name.empty()) return NULL; std::string recipient = cfg.get("recipient", std::string()); std::string destination = cfg.get("destination", std::string()); return new hello_world(name, recipient, destination); } static mantra::scheduler::scheduler::task_factory::registrar register_hello_world( mantra::scheduler::scheduler_module::get().factory(), "hello_world", &create_hello_world); static task *create_delay(const boost::property_tree::iptree &cfg) { std::string name = cfg.get(".name", cfg.get("name", std::string())); if (name.empty()) return NULL; unsigned int wait = cfg.get("delay", 10); return new delay(name, wait); } static mantra::scheduler::scheduler::task_factory::registrar register_delay( mantra::scheduler::scheduler_module::get().factory(), "delay", &create_delay); static task *create_except(const boost::property_tree::iptree &cfg) { std::string name = cfg.get(".name", cfg.get("name", std::string())); if (name.empty()) return NULL; return new except(name); } static mantra::scheduler::scheduler::task_factory::registrar register_except( mantra::scheduler::scheduler_module::get().factory(), "except", &create_except); static void hw_msg_func(const boost::shared_ptr &msg) { printf("Test func executed on %s\n", boost::lexical_cast(msg->id()).c_str()); } static mantra::scheduler::message::module_functions::registrar register_hw_func( mantra::scheduler::scheduler_module::get().functions(), "hw_func", &hw_msg_func); extern "C" { void handle_async_exception(active_task *task, const std::exception &e) { printf("Got Hello World Exception!\n"); throw e; } }