Browse Source

Use a signal for messages

This commits changes the behavior to read from a signal instead of
polling each filter.

We receive a signal from status-go every 0.3 seconds, only if new
messages are received. We receive a single signal for all the chats, and
we don't dispatch anymore on every message.

Signed-off-by: Andrea Maria Piana <>
Andrea Maria Piana 1 month ago
No account linked to committer's email address

+ 1
- 1
STATUS_GO_SHA256 View File

@@ -1,3 +1,3 @@
## DO NOT EDIT THIS FILE BY HAND. USE `scripts/ <tag>` instead


+ 1
- 1

@@ -1,3 +1,3 @@
## DO NOT EDIT THIS FILE BY HAND. USE `scripts/ <tag>` instead


+ 2
- 2
src/status_im/accounts/logout/core.cljs View File

@@ -11,9 +11,9 @@
[{:keys [db] :as cofx}]
(fx/merge cofx
{:keychain/clear-user-password (get-in db [:account/account :address])
:dispatch [:accounts.logout/filters-removed]
:dev-server/stop nil}
#(re-frame/dispatch [:accounts.logout/filters-removed]))

(fx/defn leave-account

+ 2
- 1
src/status_im/signals/core.cljs View File

@@ -79,4 +79,5 @@
"subscriptions.error" (ethereum.subscriptions/handle-error cofx event)
"status.chats.did-change" (chat.loading/load-chats-from-rpc cofx)
"whisper.filter.added" (transport.filters/handle-negotiated-filter cofx event)
(log/debug "Event " type " not handled"))))
"" (transport.message/receive-messages cofx event)
(log/debug "Event " type " not handled" event))))

+ 3
- 12
src/status_im/transport/core.cljs View File

@@ -36,8 +36,6 @@

(fx/defn init-whisper
"Initialises whisper protocol by:
- adding fixed shh discovery filter
- restoring existing symetric keys along with their unique filters
- (optionally) initializing mailserver"
[{:keys [db web3 all-installations] :as cofx}]
(fx/merge cofx
@@ -49,13 +47,6 @@
(message/resend-contact-messages [])))

(fx/defn stop-whisper
"Stops whisper protocol by removing all existing shh filters
It is necessary to remove the filters because status-go there isn't currently a logout feature in status-go
to clean-up after logout. When logging out of account A and logging in account B, account B would receive
account A messages without this."
[{:keys [db] :as cofx} callback]
(let [{:transport/keys [filters]} db]
(transport.filters/stop-filters callback)
"Stops whisper protocol"
(publisher/stop-fx cofx))

+ 15
- 65
src/status_im/transport/filters/core.cljs View File

@@ -27,16 +27,13 @@

;; fx functions

(defn load-filter-fx [web3 filters]
{:filters/load-filters [[web3 filters]]})
(defn load-filter-fx [filters]
{:filters/load-filters filters})

(defn remove-filter-fx [filters]
(when (seq filters)
{:filters/remove-filters filters}))

(defn stop-filter-fx [filters callback]
{:filters/stop-filters [filters callback]})

;; dispatches

(defn filters-added! [filters]
@@ -45,9 +42,6 @@
(defn filters-removed! [filters]
(re-frame/dispatch [:filters.callback/filters-removed filters]))

(defn- receive-message [chat-id js-error js-message]
(re-frame/dispatch [:transport/messages-received js-error js-message chat-id]))

;; Mailserver topics

(fx/defn upsert-mailserver-topic
@@ -192,28 +186,6 @@

;; shh filters

(defn build-filter
"Create a raw filter from a filter response"
[web3 {:keys [filter-id
chat-id] :as f}]
(let [shh-filter (.newRawMessageFilter
(utils/shh web3)
(clj->js {:allowP2P true
:filterId filter-id})
;; We set chat-id to nil on discovery or negotiated
;; as we have multiple people sending on discovery and pairing
;; messages on negotiated
(partial receive-message (if (or discovery? negotiated?) nil chat-id)))]
(assoc f :filter shh-filter)))

(defn- add-filters!
[web3 filters]
(log/debug "PERF" :add-raw-filters filters)
(map (partial build-filter web3) filters)))

(defn- responses->filters [{:keys [negotiated
@@ -266,8 +238,7 @@
(when (seq new-filters)
{:web3 (get-in cofx [:db :web3])
:filters new-filters}})))
{:filters new-filters}})))

(fx/defn load-filters
"Load all contacts and chats as filters"
@@ -278,18 +249,7 @@
filters (concat
(chats->filter-requests chats)
(contacts->filter-requests contacts))]
(load-filter-fx (:web3 db) filters)))

(defn stop-watching! [filters]
(doseq [f filters]
(.stopWatching f (constantly nil))))

(fx/defn stop-filters
"Stop all filters"
[{:keys [db]} callback]
(let [filters (map :filter (vals
(:filter/filters db)))]
(stop-filter-fx filters callback)))
(load-filter-fx filters)))

;; Load functions: utility function to load filters

@@ -299,20 +259,20 @@
(when (and (filters-initialized? db)
(not (chat-loaded? db chat-id)))
(let [chat (get-in db [:chats chat-id])]
(load-filter-fx (:web3 db) (->filter-request chat)))))
(load-filter-fx (->filter-request chat)))))

(fx/defn load-contact
"Check if we already have a filter for that contact, otherwise load the filter
if the contact has been added"
[{:keys [db]} contact]
(when-not (chat-loaded? db (:public-key contact))
(load-filter-fx (:web3 db) (contacts->filter-requests [contact]))))
(load-filter-fx (contacts->filter-requests [contact]))))

(fx/defn load-member
"Check if we already have a filter for that member, otherwise load the filter, regardless of whether is in our contacts"
[{:keys [db]} public-key]
(when-not (chat-loaded? db public-key)
(load-filter-fx (:web3 db) (->filter-request {:chat-id public-key}))))
(load-filter-fx (->filter-request {:chat-id public-key}))))

(fx/defn load-members
"Load multiple members"
@@ -360,9 +320,9 @@

(fn [{:keys [web3 filters]}]
(fn [{:keys [filters]}]
(log/debug "PERF" :filters/add-raw-filters)
(add-filters! web3 filters)))
(filters-added! filters)))

;; Here we stop first polling and then we hit status-go, otherwise it would throw
;; an error trying to poll from a delete filter. If we fail to remove the filter though
@@ -371,26 +331,16 @@
(fn [filters]
(log/debug "removing filters" filters)
(stop-watching! (map :filter filters))
(map ->remove-filter-request filters)
#(filters-removed! filters)
#(log/error "remove-filters: failed error" %))))

(fn [[filters callback]]
(stop-watching! filters)

(fn [ops]
(when (seq ops)
(let [web3 (first (peek ops))
filters (mapcat peek ops)]
#(add-filters! web3
(map responses->filters %))
#(log/error "load-filters: failed error" %))))))
(fn [filters]
#(filters-added! (map responses->filters %))
#(log/error "load-filters: failed error" %))))

+ 27
- 18
src/status_im/transport/message/core.cljs View File

@@ -14,28 +14,21 @@
[status-im.utils.fx :as fx]
[taoensso.timbre :as log]))

(defn unwrap-message
"Extract message from new payload {:id some-id :message some-message}
or old (just plain message)"
(let [clj-message (js->clj js-message :keywordize-keys true)
{:keys [message id]} clj-message]
{:message (or message clj-message)
:raw-payload (if message
(o/get js-message "message")
:id id}))
(defn add-raw-payload
"Add raw payload for id calculation"
[{:keys [message] :as m}]
(assoc m :raw-payload (clj->js message)))

(fx/defn receive-message
"Receive message handles a new status-message.
dedup-id is passed by status-go and is used to deduplicate messages at that layer.
Once a message has been successfuly processed, that id needs to be sent back
in order to stop receiving that message"
[cofx now-in-s filter-chat-id js-message]
[cofx now-in-s filter-chat-id message]
(let [blocked-contacts (get-in cofx [:db :contacts/blocked] #{})
{{:keys [payload sig timestamp ttl]} :message
dedup-id :id
raw-payload :raw-payload} (unwrap-message js-message)
raw-payload :raw-payload} (add-raw-payload message)
status-message (-> payload
@@ -63,15 +56,31 @@

(fx/defn receive-whisper-messages
[{:keys [now] :as cofx} js-error js-messages chat-id]
(if (and (not js-error)
[{:keys [now] :as cofx} error messages chat-id]
(if (and (not error)
(let [now-in-s (quot now 1000)
receive-message-fxs (map (fn [message]
(receive-message now-in-s chat-id message))
(js-obj->seq js-messages))]
(apply fx/merge cofx receive-message-fxs))
(log/error "Something went wrong" js-error js-messages)))
(log/error "Something went wrong" error messages)))

(fx/defn receive-messages [cofx event]
(let [fxs (map
(fn [{:keys [chat messages error]}]
;; For discovery and negotiated filters we don't
;; set a chatID, and we use the signature of the message
;; to indicate which chat it is for
(if (or (:discovery chat)
(:negotiated chat))
(:chatId chat))))
(:messages event))]
(apply fx/merge cofx fxs)))

(fx/defn remove-hash
[{:keys [db] :as cofx} envelope-hash]

+ 9
- 8
test/cljs/status_im/test/transport/core.cljs View File

@@ -56,14 +56,15 @@

(def sig "0x04325367620ae20dd878dbb39f69f02c567d789dd21af8a88623dc5b529827c2812571c380a2cd8236a2851b8843d6486481166c39debf60a5d30b9099c66213e4")

(def messages #js [{:sig sig
:ttl 10
:timestamp 1527692015
:topic "0x9c22ff5f"
:payload "0x5b227e236334222c5b2246222c22746578742f706c61696e222c227e3a7075626c69632d67726f75702d757365722d6d657373616765222c3135323736393230313433383130312c313532373639323031343337375d5d"
:padding "0xbf06347cc7f9aa18b4a846032264a88f559d9b14079975d14b10648847c0543a77a80624e101c082d19b502ae3b4f97958d18abf59eb0a82afc1301aa22470495fac739a30c2f563599fa8d8e09363a43d39311596b7f119dee7b046989c08224f1ef5cdc385"
:pow 0.002631578947368421
:hash "0x220ef9994a4fae64c112b27ed07ef910918159cbe6fcf8ac515ee2bf9a6711a0"}])
(def messages [{:id "someid"
:message {:sig sig
:ttl 10
:timestamp 1527692015
:topic "0x9c22ff5f"
:payload "0x5b227e236334222c5b2246222c22746578742f706c61696e222c227e3a7075626c69632d67726f75702d757365722d6d657373616765222c3135323736393230313433383130312c313532373639323031343337375d5d"
:padding "0xbf06347cc7f9aa18b4a846032264a88f559d9b14079975d14b10648847c0543a77a80624e101c082d19b502ae3b4f97958d18abf59eb0a82afc1301aa22470495fac739a30c2f563599fa8d8e09363a43d39311596b7f119dee7b046989c08224f1ef5cdc385"
:pow 0.002631578947368421
:hash "0x220ef9994a4fae64c112b27ed07ef910918159cbe6fcf8ac515ee2bf9a6711a0"}}])

(deftest receive-whisper-messages-test
(testing "an error is reported"

+ 9
- 9
test/cljs/status_im/test/transport/filters/core.cljs View File

@@ -93,19 +93,19 @@

(deftest load-member
(testing "it returns fx for a member"
(is (= {:filters/load-filters [[nil [{:ChatID "0xchat-id-2"
:OneToOne true
:Identity "chat-id-2"}]]]}
(is (= {:filters/load-filters [{:ChatID "0xchat-id-2"
:OneToOne true
:Identity "chat-id-2"}]}
(transport.filters/load-member {:db {}} "0xchat-id-2"))))
(testing "merging fx"
(is (=
{:db {}
:filters/load-filters [[nil [{:ChatID "0xchat-id-1"
:OneToOne true
:Identity "chat-id-1"}]]
[nil [{:ChatID "0xchat-id-2"
:OneToOne true
:Identity "chat-id-2"}]]]}
:filters/load-filters [{:ChatID "0xchat-id-1"
:OneToOne true
:Identity "chat-id-1"}
{:ChatID "0xchat-id-2"
:OneToOne true
:Identity "chat-id-2"}]}
(apply fx/merge {:db {}}
(map transport.filters/load-member ["0xchat-id-1" "0xchat-id-2"]))))))