Implement listening
Signed-off-by: Rahix <rahix@rahix.de>
This commit is contained in:
parent
8fb07e22cc
commit
719201e950
3 changed files with 215 additions and 6 deletions
82
bc.lua
82
bc.lua
|
|
@ -17,16 +17,18 @@ local Message = {
|
||||||
ListenCancel = 0x4c535450,
|
ListenCancel = 0x4c535450,
|
||||||
}
|
}
|
||||||
|
|
||||||
local function query_param(self, v)
|
local query_param = {
|
||||||
|
__call=function(self, v)
|
||||||
return {ty=self.ty, v=v}
|
return {ty=self.ty, v=v}
|
||||||
end
|
end,
|
||||||
|
}
|
||||||
local Query = {
|
local Query = {
|
||||||
Change = {ty=0x434847},
|
Change = {ty=0x434847},
|
||||||
Rising = {ty=0x524953},
|
Rising = {ty=0x524953},
|
||||||
Falling = {ty=0x46414c},
|
Falling = {ty=0x46414c},
|
||||||
Equals = setmetatable({ty=0x455155}, {__call=query_param}),
|
Equals = setmetatable({ty=0x455155, invalid=true}, query_param),
|
||||||
Above = setmetatable({ty=0x414256}, {__call=query_param}),
|
Above = setmetatable({ty=0x414256, invalid=true}, query_param),
|
||||||
Below = setmetatable({ty=0x424c4f}, {__call=query_param}),
|
Below = setmetatable({ty=0x424c4f, invalid=true}, query_param),
|
||||||
}
|
}
|
||||||
|
|
||||||
-- Network ---------------------------------------------------------------- {{{
|
-- Network ---------------------------------------------------------------- {{{
|
||||||
|
|
@ -99,12 +101,14 @@ local BaseControl = {
|
||||||
Network = Network,
|
Network = Network,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
-- Lifecycle {{{
|
||||||
function BaseControl:new(network)
|
function BaseControl:new(network)
|
||||||
local self = {
|
local self = {
|
||||||
local_nouns = {},
|
local_nouns = {},
|
||||||
local_verbs = {},
|
local_verbs = {},
|
||||||
remote_nouns = {},
|
remote_nouns = {},
|
||||||
remote_verbs = {},
|
remote_verbs = {},
|
||||||
|
listeners = {},
|
||||||
|
|
||||||
live = false,
|
live = false,
|
||||||
network = network or Network:new(),
|
network = network or Network:new(),
|
||||||
|
|
@ -222,6 +226,7 @@ function BaseControl:close()
|
||||||
|
|
||||||
setmetatable(self, nil)
|
setmetatable(self, nil)
|
||||||
end
|
end
|
||||||
|
-- }}}
|
||||||
|
|
||||||
-- Nouns {{{
|
-- Nouns {{{
|
||||||
function BaseControl:has_noun(noun)
|
function BaseControl:has_noun(noun)
|
||||||
|
|
@ -272,7 +277,31 @@ function BaseControl:set(name, value)
|
||||||
if type(value) == "function" then
|
if type(value) == "function" then
|
||||||
error("\""..name.."\" can't be cast into a verb")
|
error("\""..name.."\" can't be cast into a verb")
|
||||||
else
|
else
|
||||||
|
local old = self.local_nouns[name]
|
||||||
self.local_nouns[name] = value
|
self.local_nouns[name] = value
|
||||||
|
|
||||||
|
if self.listeners[name] ~= nil then
|
||||||
|
for id, l in pairs(self.listeners[name]) do
|
||||||
|
if (l.query.ty == Query.Change.ty and value ~= old)
|
||||||
|
or (l.query.ty == Query.Rising.ty and value > old)
|
||||||
|
or (l.query.ty == Query.Falling.ty and value < old)
|
||||||
|
or (l.query.ty == Query.Equals.ty and value == l.query.v)
|
||||||
|
or (l.query.ty == Query.Above.ty and value > l.query.v)
|
||||||
|
or (l.query.ty == Query.Below.ty and value < l.query.v)
|
||||||
|
then
|
||||||
|
if l.callback ~= nil then
|
||||||
|
l.callback(value)
|
||||||
|
else
|
||||||
|
self.network:send(l.addr, {
|
||||||
|
ty=Message.ListenNotify,
|
||||||
|
noun=name,
|
||||||
|
id=id,
|
||||||
|
value=value,
|
||||||
|
})
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
elseif self.local_verbs[name] ~= nil then
|
elseif self.local_verbs[name] ~= nil then
|
||||||
if type(value) ~= "function" then
|
if type(value) ~= "function" then
|
||||||
|
|
@ -347,6 +376,35 @@ function BaseControl:call_sync(verb, ...)
|
||||||
end
|
end
|
||||||
-- }}}
|
-- }}}
|
||||||
|
|
||||||
|
-- Listening {{{
|
||||||
|
function BaseControl:listen(noun, query, callback)
|
||||||
|
if query.invalid then
|
||||||
|
error("invalid query, forgot a parameter?")
|
||||||
|
end
|
||||||
|
if self.listeners[noun] == nil then
|
||||||
|
self.listeners[noun] = {}
|
||||||
|
end
|
||||||
|
local id = uuid.next()
|
||||||
|
self.listeners[noun][id] = {
|
||||||
|
query=query,
|
||||||
|
callback=callback,
|
||||||
|
}
|
||||||
|
if self.local_nouns[noun] ~= nil then
|
||||||
|
return true
|
||||||
|
elseif self.remote_nouns[noun] ~= nil then
|
||||||
|
self.network:send(self.remote_nouns[noun], {
|
||||||
|
ty=Message.ListenRequest,
|
||||||
|
noun=noun,
|
||||||
|
id=id,
|
||||||
|
query=query,
|
||||||
|
})
|
||||||
|
return true
|
||||||
|
else
|
||||||
|
return false
|
||||||
|
end
|
||||||
|
end
|
||||||
|
-- }}}
|
||||||
|
|
||||||
-- Network Handler {{{
|
-- Network Handler {{{
|
||||||
function BaseControl:_network_handler(remote, msg)
|
function BaseControl:_network_handler(remote, msg)
|
||||||
if msg.ty == Message.Hello then
|
if msg.ty == Message.Hello then
|
||||||
|
|
@ -397,6 +455,20 @@ function BaseControl:_network_handler(remote, msg)
|
||||||
end
|
end
|
||||||
elseif msg.ty == Message.VerbResponse then
|
elseif msg.ty == Message.VerbResponse then
|
||||||
-- Handled via pull
|
-- Handled via pull
|
||||||
|
elseif msg.ty == Message.ListenRequest then
|
||||||
|
if self.local_nouns[msg.noun] ~= nil then
|
||||||
|
if self.listeners[msg.noun] == nil then
|
||||||
|
self.listeners[msg.noun] = {}
|
||||||
|
end
|
||||||
|
self.listeners[msg.noun][msg.id] = {
|
||||||
|
query=msg.query,
|
||||||
|
addr=remote,
|
||||||
|
}
|
||||||
|
end
|
||||||
|
elseif msg.ty == Message.ListenNotify then
|
||||||
|
if self.listeners[msg.noun][msg.id] ~= nil then
|
||||||
|
self.listeners[msg.noun][msg.id].callback(msg.value)
|
||||||
|
end
|
||||||
else
|
else
|
||||||
error("TODO: MessageType Unknown")
|
error("TODO: MessageType Unknown")
|
||||||
end
|
end
|
||||||
|
|
|
||||||
|
|
@ -36,6 +36,7 @@ function network.send(addr, port, msg)
|
||||||
|
|
||||||
-- print("Sending from "..current_node.." to "..addr..": "..msg)
|
-- print("Sending from "..current_node.." to "..addr..": "..msg)
|
||||||
-- print(require("inspect").inspect(active_node))
|
-- print(require("inspect").inspect(active_node))
|
||||||
|
|
||||||
latest_message[addr] = {
|
latest_message[addr] = {
|
||||||
from=current_node,
|
from=current_node,
|
||||||
port=port,
|
port=port,
|
||||||
|
|
|
||||||
136
tests/listening.lua
Normal file
136
tests/listening.lua
Normal file
|
|
@ -0,0 +1,136 @@
|
||||||
|
require("lunit")
|
||||||
|
|
||||||
|
local network = require("network")
|
||||||
|
local serialization = require("serialization")
|
||||||
|
local BaseControl = require("bc")
|
||||||
|
|
||||||
|
module("listening", package.seeall, lunit.testcase)
|
||||||
|
|
||||||
|
function test_invalid_query()
|
||||||
|
local bc = BaseControl:new()
|
||||||
|
local invalids = {
|
||||||
|
bc.Query.Equals,
|
||||||
|
bc.Query.Above,
|
||||||
|
bc.Query.Below,
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, q in ipairs(invalids) do
|
||||||
|
assert_error_match("invalid", function()
|
||||||
|
bc:listen("invq1", q, function() end)
|
||||||
|
end)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
function test_listen_simple()
|
||||||
|
local bc = BaseControl:new()
|
||||||
|
bc:register("lisimpl1", 1234)
|
||||||
|
bc:finalize()
|
||||||
|
|
||||||
|
local n = 0
|
||||||
|
bc:listen("lisimpl1", bc.Query.Change, function(new)
|
||||||
|
n = n + 1
|
||||||
|
end)
|
||||||
|
|
||||||
|
assert_equal(0, n, "wrong number of listener invokations")
|
||||||
|
bc:set("lisimpl1", 4321)
|
||||||
|
assert_equal(1, n, "wrong number of listener invokations")
|
||||||
|
bc:set("lisimpl1", 42)
|
||||||
|
assert_equal(2, n, "wrong number of listener invokations")
|
||||||
|
end
|
||||||
|
|
||||||
|
function test_listen_network_simple()
|
||||||
|
local bc1 = BaseControl:new()
|
||||||
|
local addr1 = network.get_scene()
|
||||||
|
bc1:register("lin1", 1234)
|
||||||
|
bc1:finalize()
|
||||||
|
|
||||||
|
local bc2 = BaseControl:new()
|
||||||
|
local addr2 = network.get_scene()
|
||||||
|
|
||||||
|
local tmp, n = 0, 0
|
||||||
|
bc2:listen("lin1", bc1.Query.Change, function(new)
|
||||||
|
assert_equal(tmp, new, "value did not propagate")
|
||||||
|
n = n + 1
|
||||||
|
end)
|
||||||
|
|
||||||
|
network.set_scene(addr1)
|
||||||
|
tmp = 12
|
||||||
|
assert_equal(0, n, "wrong number of listener invokations")
|
||||||
|
bc1:set("lin1", 12)
|
||||||
|
assert_equal(1, n, "wrong number of listener invokations")
|
||||||
|
|
||||||
|
tmp = 16
|
||||||
|
bc1:set("lin1", 16)
|
||||||
|
assert_equal(2, n, "wrong number of listener invokations")
|
||||||
|
end
|
||||||
|
|
||||||
|
-- Generate testcases for queries
|
||||||
|
do
|
||||||
|
local tests = {
|
||||||
|
{name="change", query=BaseControl.Query.Change, pos1=1, neg=1, pos2=2},
|
||||||
|
{name="rising", query=BaseControl.Query.Rising, pos1=1, neg=-1, pos2=1},
|
||||||
|
{name="falling", query=BaseControl.Query.Falling, pos1=-1, neg=1, pos2=-1},
|
||||||
|
{name="equals", query=BaseControl.Query.Equals(123), pos1=123, neg=124, pos2=123},
|
||||||
|
{name="above", query=BaseControl.Query.Above(100), pos1=101, neg=99, pos2=123},
|
||||||
|
{name="below", query=BaseControl.Query.Below(100), pos1=12, neg=124, pos2=23},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test in ipairs(tests) do
|
||||||
|
-- Local test
|
||||||
|
listening["test_listen_query_"..test.name] = function()
|
||||||
|
local bc = BaseControl:new()
|
||||||
|
local noun = "lq"..test.name.."1"
|
||||||
|
bc:register(noun, 0)
|
||||||
|
bc:finalize()
|
||||||
|
|
||||||
|
local n = 0
|
||||||
|
bc:listen(noun, test.query, function(new)
|
||||||
|
n = n + 1
|
||||||
|
end)
|
||||||
|
|
||||||
|
assert_equal(0, n, "wrong number of listener invokations")
|
||||||
|
bc:set(noun, test.pos1)
|
||||||
|
assert_equal(1, n, "wrong number of listener invokations")
|
||||||
|
bc:set(noun, test.neg)
|
||||||
|
assert_equal(1, n, "wrong number of listener invokations")
|
||||||
|
bc:set(noun, test.pos2)
|
||||||
|
assert_equal(2, n, "wrong number of listener invokations")
|
||||||
|
|
||||||
|
bc:close()
|
||||||
|
end
|
||||||
|
|
||||||
|
-- Remote test
|
||||||
|
listening["test_listen_network_query_"..test.name] = function()
|
||||||
|
local bc1 = BaseControl:new()
|
||||||
|
local addr1 = network.get_scene()
|
||||||
|
local noun = "lnq"..test.name.."1"
|
||||||
|
bc1:register(noun, 0)
|
||||||
|
bc1:finalize()
|
||||||
|
|
||||||
|
local bc2 = BaseControl:new()
|
||||||
|
local addr2 = network.get_scene()
|
||||||
|
|
||||||
|
local tmp, n = 0, 0
|
||||||
|
bc2:listen(noun, test.query, function(new)
|
||||||
|
assert_equal(tmp, new, "value did not propagate")
|
||||||
|
n = n + 1
|
||||||
|
end)
|
||||||
|
|
||||||
|
network.set_scene(addr1)
|
||||||
|
assert_equal(0, n, "wrong number of listener invokations")
|
||||||
|
tmp = test.pos1
|
||||||
|
bc1:set(noun, test.pos1)
|
||||||
|
assert_equal(1, n, "wrong number of listener invokations")
|
||||||
|
tmp = test.neg
|
||||||
|
bc1:set(noun, test.neg)
|
||||||
|
assert_equal(1, n, "wrong number of listener invokations")
|
||||||
|
tmp = test.pos2
|
||||||
|
bc1:set(noun, test.pos2)
|
||||||
|
assert_equal(2, n, "wrong number of listener invokations")
|
||||||
|
|
||||||
|
bc1:close()
|
||||||
|
network.set_scene(addr2)
|
||||||
|
bc2:close()
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
Loading…
Add table
Add a link
Reference in a new issue