From 719201e9503abab2f7a018413f7bbcda3a74a7c6 Mon Sep 17 00:00:00 2001 From: Rahix Date: Wed, 17 Apr 2019 02:26:57 +0200 Subject: [PATCH] Implement listening Signed-off-by: Rahix --- bc.lua | 84 +++++++++++++++++++++++++-- network.lua | 1 + tests/listening.lua | 136 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 215 insertions(+), 6 deletions(-) create mode 100644 tests/listening.lua diff --git a/bc.lua b/bc.lua index e15bf4a85533..99a250c16b71 100644 --- a/bc.lua +++ b/bc.lua @@ -17,16 +17,18 @@ local Message = { ListenCancel = 0x4c535450, } -local function query_param(self, v) - return {ty=self.ty, v=v} -end +local query_param = { + __call=function(self, v) + return {ty=self.ty, v=v} + end, +} local Query = { Change = {ty=0x434847}, Rising = {ty=0x524953}, Falling = {ty=0x46414c}, - Equals = setmetatable({ty=0x455155}, {__call=query_param}), - Above = setmetatable({ty=0x414256}, {__call=query_param}), - Below = setmetatable({ty=0x424c4f}, {__call=query_param}), + Equals = setmetatable({ty=0x455155, invalid=true}, query_param), + Above = setmetatable({ty=0x414256, invalid=true}, query_param), + Below = setmetatable({ty=0x424c4f, invalid=true}, query_param), } -- Network ---------------------------------------------------------------- {{{ @@ -99,12 +101,14 @@ local BaseControl = { Network = Network, } +-- Lifecycle {{{ function BaseControl:new(network) local self = { local_nouns = {}, local_verbs = {}, remote_nouns = {}, remote_verbs = {}, + listeners = {}, live = false, network = network or Network:new(), @@ -222,6 +226,7 @@ function BaseControl:close() setmetatable(self, nil) end +-- }}} -- Nouns {{{ function BaseControl:has_noun(noun) @@ -272,7 +277,31 @@ function BaseControl:set(name, value) if type(value) == "function" then error("\""..name.."\" can't be cast into a verb") else + local old = self.local_nouns[name] self.local_nouns[name] = value + + if self.listeners[name] ~= nil then + for id, l in pairs(self.listeners[name]) do + if (l.query.ty == Query.Change.ty and value ~= old) + or (l.query.ty == Query.Rising.ty and value > old) + or (l.query.ty == Query.Falling.ty and value < old) + or (l.query.ty == Query.Equals.ty and value == l.query.v) + or (l.query.ty == Query.Above.ty and value > l.query.v) + or (l.query.ty == Query.Below.ty and value < l.query.v) + then + if l.callback ~= nil then + l.callback(value) + else + self.network:send(l.addr, { + ty=Message.ListenNotify, + noun=name, + id=id, + value=value, + }) + end + end + end + end end elseif self.local_verbs[name] ~= nil then if type(value) ~= "function" then @@ -347,6 +376,35 @@ function BaseControl:call_sync(verb, ...) end -- }}} +-- Listening {{{ +function BaseControl:listen(noun, query, callback) + if query.invalid then + error("invalid query, forgot a parameter?") + end + if self.listeners[noun] == nil then + self.listeners[noun] = {} + end + local id = uuid.next() + self.listeners[noun][id] = { + query=query, + callback=callback, + } + if self.local_nouns[noun] ~= nil then + return true + elseif self.remote_nouns[noun] ~= nil then + self.network:send(self.remote_nouns[noun], { + ty=Message.ListenRequest, + noun=noun, + id=id, + query=query, + }) + return true + else + return false + end +end +-- }}} + -- Network Handler {{{ function BaseControl:_network_handler(remote, msg) if msg.ty == Message.Hello then @@ -397,6 +455,20 @@ function BaseControl:_network_handler(remote, msg) end elseif msg.ty == Message.VerbResponse then -- Handled via pull + elseif msg.ty == Message.ListenRequest then + if self.local_nouns[msg.noun] ~= nil then + if self.listeners[msg.noun] == nil then + self.listeners[msg.noun] = {} + end + self.listeners[msg.noun][msg.id] = { + query=msg.query, + addr=remote, + } + end + elseif msg.ty == Message.ListenNotify then + if self.listeners[msg.noun][msg.id] ~= nil then + self.listeners[msg.noun][msg.id].callback(msg.value) + end else error("TODO: MessageType Unknown") end diff --git a/network.lua b/network.lua index e45a125fb17b..7f927a9775a2 100644 --- a/network.lua +++ b/network.lua @@ -36,6 +36,7 @@ function network.send(addr, port, msg) -- print("Sending from "..current_node.." to "..addr..": "..msg) -- print(require("inspect").inspect(active_node)) + latest_message[addr] = { from=current_node, port=port, diff --git a/tests/listening.lua b/tests/listening.lua new file mode 100644 index 000000000000..0115bf44cd8e --- /dev/null +++ b/tests/listening.lua @@ -0,0 +1,136 @@ +require("lunit") + +local network = require("network") +local serialization = require("serialization") +local BaseControl = require("bc") + +module("listening", package.seeall, lunit.testcase) + +function test_invalid_query() + local bc = BaseControl:new() + local invalids = { + bc.Query.Equals, + bc.Query.Above, + bc.Query.Below, + } + + for _, q in ipairs(invalids) do + assert_error_match("invalid", function() + bc:listen("invq1", q, function() end) + end) + end +end + +function test_listen_simple() + local bc = BaseControl:new() + bc:register("lisimpl1", 1234) + bc:finalize() + + local n = 0 + bc:listen("lisimpl1", bc.Query.Change, function(new) + n = n + 1 + end) + + assert_equal(0, n, "wrong number of listener invokations") + bc:set("lisimpl1", 4321) + assert_equal(1, n, "wrong number of listener invokations") + bc:set("lisimpl1", 42) + assert_equal(2, n, "wrong number of listener invokations") +end + +function test_listen_network_simple() + local bc1 = BaseControl:new() + local addr1 = network.get_scene() + bc1:register("lin1", 1234) + bc1:finalize() + + local bc2 = BaseControl:new() + local addr2 = network.get_scene() + + local tmp, n = 0, 0 + bc2:listen("lin1", bc1.Query.Change, function(new) + assert_equal(tmp, new, "value did not propagate") + n = n + 1 + end) + + network.set_scene(addr1) + tmp = 12 + assert_equal(0, n, "wrong number of listener invokations") + bc1:set("lin1", 12) + assert_equal(1, n, "wrong number of listener invokations") + + tmp = 16 + bc1:set("lin1", 16) + assert_equal(2, n, "wrong number of listener invokations") +end + +-- Generate testcases for queries +do + local tests = { + {name="change", query=BaseControl.Query.Change, pos1=1, neg=1, pos2=2}, + {name="rising", query=BaseControl.Query.Rising, pos1=1, neg=-1, pos2=1}, + {name="falling", query=BaseControl.Query.Falling, pos1=-1, neg=1, pos2=-1}, + {name="equals", query=BaseControl.Query.Equals(123), pos1=123, neg=124, pos2=123}, + {name="above", query=BaseControl.Query.Above(100), pos1=101, neg=99, pos2=123}, + {name="below", query=BaseControl.Query.Below(100), pos1=12, neg=124, pos2=23}, + } + + for _, test in ipairs(tests) do + -- Local test + listening["test_listen_query_"..test.name] = function() + local bc = BaseControl:new() + local noun = "lq"..test.name.."1" + bc:register(noun, 0) + bc:finalize() + + local n = 0 + bc:listen(noun, test.query, function(new) + n = n + 1 + end) + + assert_equal(0, n, "wrong number of listener invokations") + bc:set(noun, test.pos1) + assert_equal(1, n, "wrong number of listener invokations") + bc:set(noun, test.neg) + assert_equal(1, n, "wrong number of listener invokations") + bc:set(noun, test.pos2) + assert_equal(2, n, "wrong number of listener invokations") + + bc:close() + end + + -- Remote test + listening["test_listen_network_query_"..test.name] = function() + local bc1 = BaseControl:new() + local addr1 = network.get_scene() + local noun = "lnq"..test.name.."1" + bc1:register(noun, 0) + bc1:finalize() + + local bc2 = BaseControl:new() + local addr2 = network.get_scene() + + local tmp, n = 0, 0 + bc2:listen(noun, test.query, function(new) + assert_equal(tmp, new, "value did not propagate") + n = n + 1 + end) + + network.set_scene(addr1) + assert_equal(0, n, "wrong number of listener invokations") + tmp = test.pos1 + bc1:set(noun, test.pos1) + assert_equal(1, n, "wrong number of listener invokations") + tmp = test.neg + bc1:set(noun, test.neg) + assert_equal(1, n, "wrong number of listener invokations") + tmp = test.pos2 + bc1:set(noun, test.pos2) + assert_equal(2, n, "wrong number of listener invokations") + + bc1:close() + network.set_scene(addr2) + bc2:close() + end + end +end