Vstříc řazení v lineárním čase

Obecné řazení v lineárním čase je meta, které není možné dosáhnout1. Ve výsledné složitosti se vždycky vyskytne log(n) faktor. Může být maskovaný za něco jiného, ale vždycky tam bude. Třeba takový LSD radix sort má složitost O(kn), kde k je právě log(n) v převleku.

Ale to, že nemůžeme vyhrát na teoretické frontě, není důležité. Záleží jenom na tom, jestli můžeme vyhrát prakticky. Algoritmus nemusí být O(n), stačí jen když se bude tvářit jako O(n) pro všechny vstupy, které mu můžu realisticky předhodit. Dokonce ani O(n) čas není důležitý, záleží jen na rychlosti.

O tom je tenhle článek: Nesnažím se vyhrát, jen prohrávat co nejméně.


Jeden způsob velice rychlého řazení je least significan digit (LSD) radix sort. Pro vstupní pole libovolné délek je cena na prvek v postatě plochá (pro 4B hodnoty se pohybuje na úrovni 15ns/prvek). Jeho nevýhoda spočívá v tom, že nejlépe funguje jen pro krátké bitové stringy konstantní délky jako jsou 4B inty nebo 8B longy. Pracuje tedy na omezeném rozsahu hodnot (232 nebo 264) namísto nekonečné množiny obecných řetězců. Pro ty jsou jiné algoritmy: most significat digit (MSD) radix sort nebo burst sort. Ty můžou být rychlejší než klasické quick/merge/heap sorty, ale pořád jsou pomalejší než velice kompaktní LSD radix sort.

V ideálním světě by mělo být možné využít LSD radix sortu jako dílčí komponenty k řazení dlouhých stringů. Dlouhou dobu jsem přemýšlel jak to udělat co nejlépe, protože nejjednodušší řešení – řazení LSD radix sortem podle prefixů stringů – nefunguje zas tak dobře.

V pseudokódu, který se podobá Scale, to může vypadat nějak takhle:

val strings: Array[String] = ???

val packed = new Array[Long](strings.length)

for (i <- 0 until strings.length) {
  val s = strings(i)

  // zabalit 4B prefix a 4B index do jednoho 8B longu
  packed(i) = prefixOfString(s).toLong << 32 | i
}

// samotné řazení
val sorted = RadixSort(packed, sortBytes = 4 until 8)

sorted
  .groupBy(p => p >> 32) // rozlámat výstup na bloky se stejným prefixem
  .map { case (prefix, ps) =>
    ps.map(p => p & 0xffffffff) // extrahovat index
      .map(i => strings(i))     // převést na vstupní stringy
      .sorted                   // výsledné skupiny seřadit jiným algoritmem
  }.flatten

Problém tohoto přístupu je malá informační hodnota prefixů. Pokud bych takto řadil Javovské stringy obsahující především znaky [a-zA-Z0–9], bylo by to neslavné, protože Java string je v jádru pole dvoubajtových charů.

String "k47" je tvořen bajty 0 6B 0 34 0 37. Liché bajty nepřináší žádnou informaci a ty sudé v tomto případě obsahují jednu z 62 možných hodnot. Jeden char tedy má jen něco málo pod 6 bitů informace.

Ve skutečnosti je to ještě horší. Některé znaky jsou častější než jiné a tedy přináší ještě méně informace. 4 bajty prefixu můžou mít hodně pod 12 bitů informace, které můžou v nejlepším případě diskriminovat mezi 4096 různými prefixy. To není nic moc.5

Radixsortování podle prefixu nefunguje tak dobře jak by mělo navzdory tomu, že 4B int dokáže diskriminovat mezi více jak 4 miliardami hodnot. Tohle mi nějakou dobu leželo na mysli. Potřeboval bych funkci, která mapuje vstupní string na 4B int, který obsahuje víc informace než 4B prefix a tedy lépe rozdělí vstupní data.

Kdybych měl funkci, která přetransformuje string na jeho pořadové číslo, mám vyhráno. Kdyby ta funkce běžela v O(1), měl bych řazení v O(n). To ale není možné, protože taková funkce by záležela na všech n vstupních hodnotách.

Místo toho mi stačí funkce, která komprimuje prefix, tedy využije maximum možných informace z delšího prefixu a zkomprimuje je do 4B nebo 8B čísla, které pak použiji k řazení rychlým LSD radix sortem. Je potřeba jen aby funkce nezměnila vzájemné pořadí prvků. Může namapovat několik různých na jeden prefix, to je povolené a nevyhnutelné, ale nesmí změnit jejich vzájemné pořadí. Tedy například:

f(s) = Σ 1 / (s_i-96)^i

f("")       = 0.0
f("a")      = 0.0384
f("aa")     = 0.0399
f("aaaa")   = 0.03999
f("b")      = 0.0769
f("ggggga") = 0.2796
f("gggggz") = 0.2796 // kolize
f("z")      = 0.961

a = b  => f(a) = f(b)
a < b  => f(a) <= f(b)

Tuhle funkci můžu zkonstruovat ručně, pokud něco vím o řazených datech2 nebo ji můžu vytvořit statisticky ze vzorku vstupních stringů (předpokládám, že jednotlivé bajty jsou na sobě nezávislé).6 Ty nejčastější poskytují minimum informace a potřebuji tedy víc dalších bajtů, abych dostal dost informace pro přesné mapování. Ty nejméně časté poskytují víc informace a samy o sobě vedou k relativně přesné diskriminaci.7

Spočítám relativní frekvence jednotlivých bajtů, ty využiji, abych převedl každý string na pozici na intervalu 0.0 – 1.0 a tu pak převedu na plný interval longu` (jen se bojím, že se v tomto kroku můžou vyskytnout chyby zaokrouhlování, které poruší požadavek na zachování pořadí).

Ukázka mapování se čtyřmi možnými hodnotami – a je velice časté, b a c jsou méně časté a d je raritní:

Když mám tuhle funkci, aplikuji ji na každý řazený prvek (v ukázce pseudokódu to nahradí volání prefixOfString), sestavím pole dvojic (prefix, index) zabalené do longů, seřadím radix sortem a pak rozsekám remízy jiným řadícím algoritmem3 .

A to je všechno. Nic víc v tom není. Používám starý známý radix sort, změním jen data na kterých ho pouštím.

Dál se dá aplikovat jen pár triků. Například můžu využít co nejvíce bitů v longu do kterého je zakódován pár (prefix, index). Index má omezené rozmezí a na jeho reprezentaci je třeba méně místa než plných 32 bitů. Například pro 1 milion indexů stačí 20 bitů.

Místo

|*******|*******|*******|*******|*******|*******|*******|*******|
[ prefix 32 bitů                |                 index 32 bitů ]

pak můžu použít

|*******|*******|*******|*******|*******|*******|*******|*******|
[ prefix 44 bitů                            |     index 20 bitů ]

Ve stejných 8 bajtech dostanu delší prefix, který bude lépe diskriminovat. Radix sort sice udělá víc průchodů (v mém případě řadí v jenom průchodu jeden bajt, ale je možné napsat verzi, která řadí po 10 bitech), ale to není problém, protože RS je velice rychlý a v porovnání s ostatními fázemi běží v zanedbatelném ča­se.

Ale pořád je třeba alokovat 24 bajtů na každý řazený prvek. To se nedá nijak obejít a je s tím třeba počítat. Samotná rychlost alokace ale nepředstavuje problém.

Výsledky jsou uspokojivé. f+sort je první jednodušší verze, která tohle používá jen jako preprocessing. Vezme celý výsledek LSD radix sortu tak jak je a seřadí ho. Javovský TimSort doslova letí na už téměř seřazených vstupech. f+s*rt je to, co popisuji výše.

Řazení slov z českých textů:

Rychlost na prvek byla téměř shodná pro 900000 slov i 4 miliony.

Řazení náhodně generovaných stringů [a-zA-Z0–9]+:

V tomto případě mapování funguje fantasticky a téměř vůbec není třeba dělat druhotné řazení. To je způsobeno tím, že stringy jsou dost dlouhé, náhodně rozložené a téměř bez duplicit.

Úzká hrdla výkonu překvapivě představuje procházení prvků při komprimaci prefixu, přerovnání pointerů podle výsledného pořadí radix sortu a inspekce samotných prvků při řazení (V ukázce kódu výše to odpovídá funkci prefixOfString, řádku „převést na vstupní stringy“ a pak závěrečné metodě sorted). Každý tento přesun způsobí jeden nevyhnutelný cache-miss, který na mém stroji v nejlepším případě trvá 16ns. Tedy něco kolem 32–48 nanosekund je promrháno na dvou nebo třech mov instrukcích4, zbytek trvá komprimace prefixů, příprava dat pro radix sort, samotný radix sort a závěrečné řazení.

Prostoru pro zlepšení už moc není a tohle může být velice blízko maximální rychlosti, které je možné vůbec dosáhnout. A to podle mě docela dobře odpovídá původnímu požadavku prohrávat co nejméně.


Dále k tématu:


Pozn:

  1. Řadit v O(n) se dá jen ve speciálních případech, kdy vstupní data mají omezený rozsah hodnot (Pigeonhole sort, Counting sort) nebo známé rozložení (Bucket sort).
  2. Můžu třeba stringy převést na číselníky, kdy číselná reprezentace pořadím odpovídá pořadí stringů jako v některých sloupcových databázích.
  3. Tento případ lehce komplikuje fakt, že nevím kolik bajtů prefixu jsem použil a kolik tedy můžu bezpečně přeskočit, kdybych chtěl druhé kolo řazení provést algoritmem založeném na radixech. Můžu určit jenom dolní mez.
  4. Tady by se hodilo mít nějaký Céčkovitější jazyk nebo jazyk, který má structy. Pak bych mohl index nahradit přímo pointerem a ušetřit si jeden cache-miss.
  5. Ale i když to nebude fungovat ideálně, LSD RS nezabere tolik času a udělá aspoň nějakou diskriminaci, která aspoň trochu urychlí následující řazení.
  6. Logický další krok je použít strojové učení pro konstrukci kompresní funkce.
  7. Pro stringy se vyplatí zjišťovat frekvence sudých a lichých bajtů zvlášť, protože mají velice rozdílné frekvence.

Flattr this!

Posted in Algo, řazení | 2 Comments

Maximálně negativní

Když už jsem v tom, tak bych po minulém článku, tu mohl zmínit dvě drobnosti, které se nesou v podobném duchu. Jedna je jakž takž užitečná a druhá je jen intelektuální kuriozita.

#1

Nějakou dobu jsem ve Scale při řazení podle intů používal konstrukci

xs.sortBy(i => -i)

jako zkratku pro

xs.sorted.reverse

To je pěkně kompaktní, potenciálně efektivnější (nevytváří se jedna dočasná kolekce) a hlavně je to špatně.

Koho napadne, co může být špatně na jednom jediném znaku?

Problém je v tom, že znaménkové inty mají o jednu negativní hodnotu víc než těch pozitivních. Proto platí, že

Int.MinValue == -Int.MinValue

math.abs(Int.MinValue) == Int.MinValue

Z toho plyne, že když řadím podle -i, Int.MinValue bude chybně zařazený na začátek. Naštěstí existuje jednoduché řešení, které má taky jen jeden znak:

xs.sortBy(i => ~i)

Kdyby rozdíl nebyl vidět, místo mínusu je tilda, místo číselné negace je bitová negace. Pro tu platí, že

~Int.MinValue == Int.MaxValue
~0 = -1
~1 = -2

a všechno je zase v pořádku.

#2

Nedávno jsem přemýšlel nad následujícím problémem: Je možné porovnat dva znaménkové inty a dostat výsledek jako jeden bit bez CMP a SETE x86 instrukcí?

S CMP a SETE instrukcemi je to brnkačka. CMP porovná dvě hodnoty a nastaví příznaky do stavového registru. SETE pak extrahuje bit označující výsledek rovnosti. Ale jde to i bez nich?

Překvapivě ano.

Rovnost a == b je možné zapsat jako

a >= b & a <= b

Když tohle platí, a a b si musí být rovny. Výraz je pochopitelně rovný tomuhle:

!(a<b) & !(a>b)

Jestliže a < b, tak platí, že b-a je negativní a má tedy nejvyšší bit nastavený na jedničku. Můžu tedy nerovnost přepsat jako

~((b-a) >>> 31) & ~((a-b) >>> 31)

protože všechny operace jsou bitově paralelní, můžu výraz zjednodušit na

(~(b-a) & ~(a-b)) >>> 31

a pak na to hodit De Morgana

~((b-a) | (a-b)) >>> 31

Pět operací, to není špatné.

Za pozornost stojí, že obě nerovnosti můžou přetéct a vrátit špatný bit. Nikdy se však nestane, že přeteče jenom jedna z nich a zkazí výsledek. Obě nerovnosti vždy přetékají v tandemu. Když jedna přeteče a zkazí výsledek, druhá přeteče taky a opraví ho.

Teď jen přijít na to, kde by se to dalo použít.

Co se týká rychlosti, nativní CMP+SETE je pochopitelně rychlejší.


Když už jsem v tom, měl bych zmínit pár paperů, které přišly s užitečnými bit twidlling hacky.

Prvním je Faster Population Counts using AVX2 Instructions, který překonává dřívější SSSE3 fast popcount. Chytrým využitím SIMD instrukcí a bitově paralelního kódu je možné napsat population count, který je 2× rychlejší než nativní popcnt instrukce.

Dalším je Fast Sorted-Set Intersection using SIMD Instructions, který ukazuje jak napsat rychlé sjednocování množin za pomocí nejCISCovatější ze všech x86 CISC instrukci PCMPESTRM (hodí se to např. pro Jaccardovu podobnost)

A nakonec pak Sorting improves word-aligned bitmap indexes, což je tour de force publikace o (komprimovaných) bitových indexech.

To je všechno, rozejděte se.


Dále k tématu:

Flattr this!

Posted in CPU, low level | Leave a comment

99.00000000000000009 problémů s floating point čísly

Floating point čísla a jejich IEEE 754 varianta jsou jednou z těch věcí, které mě nikdy nepřestanou fascinovat. Jde o užitečnou věc, která člověka skoro přesvědčí, že všechno bude ok, že svět má svůj řád, že se stačí odevzdat floating vlnám a už se není třeba o nic starat. Hned potom ale narazí na zaokrouhlovací chyby, nekonečna, NaNy, negativní nulu, denormální čísla a hned z toho narkotického blaha vystřízliví.

V tomto duchu začal 13. sraz přátel PHP – historkou o klasickém případě, kdy floaty nepoužívat – k reprezentaci pe­něz.

Floaty pochopitelně nejsou přesné a zaokrouhlovací chyby vznikají i v triviálních případech. Například 0.1 + 0.2 je 0.30000000000000004. Pokud to šlo peníze, najednou jsem z ničeho vytvořil čtyři femtohalíře.

Tyto chyby jsou malé a nenápadné a člověk si je uvědomí až ve chvíli, kdy mu kancelář začnou obléhat auditoři s beranidly a pokřikují něco o tom, že nesedí účetnictví. Kdyby se peníze ukládaly do intů, tak v nejhorším případě částka přeteče do velkého množství záporných peněz a to je chyba, které si nově vzniklý záporný milionář snadno všimne.

Samozřejmě, na peníze a jiné kvantity, které je třeba vést naprosto přesně, se musí použít něco jako BigInteger nebo BigDecimal.

A to je jenom začátek.

Některé jazyky jako třeba PHP nebo JavaScript přidávají svoje speciality, protože v nich celá čísla automaticky přetékají do floatů.

Například tento PHP kód

for ($i = 0; $i != ($i+1); $i++) { echo "lol"; }

vypadá jak nekonečná smyčka, protože se přece nikdy nemůže stát, že $i je stejné jako $i+1. PHP na to má ale jiný názor. Int eventuálně přeteče do double floatu s nižší přesností, které mají krok mezi dvěma sousedními čísly větší než 1, součet nic nezmění a smyčka skončí. V PHP je int jen double, který čeká na svůj okamžik slávy.

Ve slušných jazycích, by int přetekl do záporných hodnot, ale pořád by platilo, že i a i+1 jsou rozdílné.

Pro zajímavost: celá čísla v intervalu [-224, 224] můžou být v single precision floatu uloženy přesně bez zaokrouhlovacích chyb.


Asociativita je užitečná vlastnost, která je stěžejní pro paralelizaci. Tomu si, jak doufám, žádný slušný člověk nedovolí odporovat. Floaty pochopitelně asociativní nejsou.

val ds = Array.fill(10000)(util.Random.nextDouble)
assert(ds.sum - util.Random.shuffle(ds).sum == 0.0)

val ds = Array.fill(10000) {
        val a, b, c = util.Random.nextDouble
        ((a+b)+c) - (a+(b+c))
}
assert(ds.forall(d => d == 0.0))

Obě aserce selžou, protože součty prováděné v různých pořadích, uvedou různé zaokrouhlovací chyby a to vede k různým finálním výsledkům.

Tohle je poměrně důležité, protože to kompilátorům svazuje ruce v optimalizacích, které můžou bezpečně udělat. Změna pořadí operací může vést ke zrychlení, ale taky k odlišnému výsledku.1


IEEE 754 specifikuje bitovou reprezentaci floatů a toho se dá využít k různým trikům. Jedním z nich je možnost transformovat float na int s tím, že vztah mezi floaty je stejný jako mezi nově vytvořenými inty. Tedy, když platí

a < b

tak platí i

floatToInt(a) < floatToInt(b)

a zároveň existuje transformace zpátky

intToFloat(floatToInt(f)) = f

Jde o pár operací, které ve Scale vypadají takto:

def flip(f: Int) = f ^ (-(f >>> 31) & 0x7FFFFFFF)

def floatToInt(f: Float) = flip(Float.floatToRawIntBits(f))
def intToFloat(i: Int)   = Float.intBitsToFloat(flip(i))

Kód je založený na tomto článku, jen jsem ho upravil pro znaménkové inty Javy. V podstatě jde o to, že pokud je float záporný, převrátím všechny bity kromě znaménka. Tím se bitová reprezentace bude chovat stejně jako int.

Když vynesu průběh intů a floatů na intervalu všech bitových hodnot od samých nul nalevo až po samé jedničky napravo, inty mají takovýhle průběh:

Floaty zase takovýto (NaNy by měly být někde za nekonečny, ale ty si teď dovolím ignorovat):

Kód jen překlopí druhou polovinu intervalu.

Tohohle triku se dá využít k jednoduchému řazení floatů radix sortem. Na začátku floaty převedu na inty zachovávající pořadí, rozjedu radix sort pro inty a na konci výsledek přepíšu zpátky na foaty. Existují i přímé způsoby, jak radixsortovat floaty, ale ty jsou komplikovanější.


Dále k tématu


Pozn.

  1. Některé architektury můžou provádět výpočty s vyšší přesností, třeba 80 bitů namísto 64 a výsledek nakonec zaokrouhlit na 64, viz strictpf.

Flattr this!

Posted in low level | 2 Comments

Závislost je špatná (pro vaše programy i pro váš hardware)

Když jsem nedávno civěl do zdrojáků knihovny breeze, narazil jsem na kód pro slakární součin, který vypadal zhruba takhle:

double sum0, sum1, sum2, sum3 = 0.0;

// ...

for(int i = 0; i < length; i += 4) {
        sum0 += a[i+0] * b[i+0];
        sum1 += a[i+1] * b[i+1];
        sum2 += a[i+2] * b[i+2];
        sum3 += a[i+3] * b[i+3];
}

// ...

return sum0 + sum1 + sum2 + sum3;

Jde o tuto smyčku, čtyřikrát odrolovanou (ve zdrojácích je to 8× ale tady uvádím jen 4× kvůli prostoru):

double sum = 0.0;
for (int i = 0; i < length; i++) {
        sum += a[i] * b[i];
}

Zajímavé je, že součet se neprovádí do jedné proměnné, ale paralelně do čtyř s tím, že na konci funkce se částečné součty zkombinují.

Chvíli jsem dumal nad tím, proč je to napsané zrovna takhle a pak mi došlo, že je to vlastně docela chytrá optimalizace, která zkracuje řetězy závislostí mezi jednotlivými operacemi a může tak vést k většímu instrukčnímu paralelismu.

Když vezmu kód, který sčítá do jedné proměnné

double sum = 0.0;

for (int i = 0; i < length; i += 4) {
        sum += a[i+0] * b[i+0];
        sum += a[i+1] * b[i+1];
        sum += a[i+2] * b[i+2];
        sum += a[i+3] * b[i+3];
}

závislosti mezi jednotlivými operacemi vypadají takhle:

Schéma ukazuje dvě iterace odrolované smyčky a je na něm jasně vidět řetěz sčítání, který musí být proveden sekvenčně. Wide issue a out-of-order procesory, které dokážou odbavit několik nezávislých operací v jenom taktu, teoreticky můžou vykonat všechny čtyři násobení a i += 4 paralelně. Součty však musí počítat sekvenčně.

Naproti tomu verze s částečnými součty má tento graf závislostí:

Jak je vidět, součty na sobě vzájemně nezávisí a hardware je může provést paralelně, jakmile je předchozí várka součtů hotová. A co víc, tento kód má jen jeden krok k plné vektorizaci pomocí FMA instrukcí.

Kompilátor tuto optimalizaci ale nemůže provést jen tak. Floating point operace nejsou komutativní nebo asociativní a když změním pořadí v jakém provádím součty, dostanu lehce odlišný výsledek. V tomhle případě to nevadí a explicitně paralelní verze je tedy preferovaná.

To je jen další příklad, že nejlépe se paralelizují nezávislé operace a platí to na všech úrovních – pro paralelizaci na úrovni strojů, vláken nebo instrukcí.


Dále k tématu:

Flattr this!

Posted in CPU | 2 Comments

Od pohledu dobrý, aneb jak najít skoro stejné obrázky mezi dvěma miliony souborů za méně než deset minut

A tenhle znáte: „Proč je Pentium rychlejší než 486? 486 počítá, ale Pentium jen odhaduje.“ Přestože jde o nejapný vtip o chybě v prvních Pentiích z počítačového pravěku, má v sobě zrnko pravdy: Odhad může být mnohem rychlejší než přesný výpočet.

Odhad, pokud chci, aby byl k něčemu dobrý, není vůbec jednoduchou záležitostí. Nestačí jen vyprodukovat nějaké hausnumero, ale přibližnou odpověď, která má jasně definovanou chybu. Mezi algoritmy, které dělají právě tohle patří kromě klasických velkých jmen jako Bloomfilter, HyperLogLog, Count-min Skech, MinHash a SimHash, patří také perceptuální hashe určené k detekci téměř shodných obrázků.

Všechny zmíněné algoritmy se spoléhají na hashovací funkce, ale některé (MinHash, SimHash a perceptuální hashe o kterých to dneska celé je) jinak než je běžně známé.

Klasické kryptografické hashe, jako je MD5, SHA1 a jejich kamarádi, jsou navrženy tak, že změna jednoho bitu vstupu v ideálním případě znamená změnu poloviny bitů výstupu. Nekryptografické hashe, běžně používané v programování, nemají tuto vlastnost. Typicky jde aspoň o univerzální hash nebo o nějaký k-nezávislý hash. Otisk je přesto výsledkem divokých permutací vstupních dat a neměl by zachovávat jejich korelace. Perceptuální hash je přesně opačný – jde o takový otisk obrázku, který zachovává korelace vstupních dat – když jsou si obrázky na vstupu podobné, budou si s největší pravděpodobností podobné i jejich otisky a míra podobnosti obrázků odpovídá podobnosti hashů.

Algoritmus je jednoduchý:

  1. Zmenším obrázek například na 32×32 pixelů (dělám to kvůli zrychlení následujících výpočtů a ne pro redukci vysokých frekvencí)
  2. Převedu na odstíny šedi (opět kvůli zrychlení následujících kro­ků)
  3. Spočítám 32×32 diskrétní kosinovou transformaci (DCT)
  4. Zredukuji DCT a nechám si jen prvních 8×8 hodnot (kromě té úplně první, která by rozhodila průměr). Tyto hodnoty reprezentují nejnižší frekvence obrázku, tedy ty, které popisují hrubou strukturu.
  5. Spočítám průměrnou hodnotu těchto frekvencí
  6. Vytvořím finální hash o délce 64 bitů – daný bit má hodnotu 1, když je korespondující frekvence vyšší než průměr, 0, pokud je menší

Takto získám perceptuální hash obrázku, kterému nevadí změny barevnosti, světlosti (kvůli tomu, že zaznamenávám rozdíl proti průměru) a ani určité úpravy obrázku samotného. Nejnižší frekvence zaznamenávají nejhrubší kontury a rozdělení světel a stínů a jsou proto odolné vůči změnám v detailech.

Když pak chci porovnat, jak jsou si dva obrázky podobné, spočítám Hammingovu vzdálenost mezi jejich perceptuálními hashi. Výsledkem je počet bitů, ve kterých se oba hashe liší. Když se neliší v žádném bitu, jde o dva vizuálně téměř identické obrázky. Možná byl jednou uložen jako jpg a podruhé jako png. Když se liší jen pár bitů (±5) jde nejspíš o jeden a tentýž obrázek s drobnými úpravami, když se liší ve více bitech, jde s největší pravděpodobností o rozdílné obrázky.

Zkusmo jsem si perceptuální hashování napsal v jazyce Go (respektive bezostyšně ukradl přepsal tohle z Javy). Kompletní kód je k vidění v samostatném gistu.

func ImagePHash(img image.Image) uint64 {
  c := make([]float64, 32)
  for i := 1; i < 32; i++ {
    c[i] = 1
  }
  c[0] = 1 / math.Sqrt(2.0)

  // Reduce size and reduce color
  img = grayscale(resize.Resize(32, 32, img, resize.Bilinear))

  vals := make([][]float64, 32)
  for i := range vals {
    vals[i] = make([]float64, 32)
  }

  bounds := img.Bounds()
  for x := 0; x < bounds.Max.X; x++ {
    for y := 0; y < bounds.Max.Y; y++ {
      _, _, b, _ := img.At(x,y).RGBA()
      vals[x][y] = float64(b)
    }
  }

  /* Compute the DCT */
  dctVals := applyDCT(vals, c)

  // Reduce the DCT and compute the average value (excluding the first term)
  total := 0.0

  for x := 0; x < 8; x++ {
    for y := 0; y < 8; y++ {
      total += dctVals[x][y]
    }
  }
  total -= dctVals[0][0]

  avg := total / float64((8 * 8) - 1)

  // Further reduce the DCT and create 64 bit hash
  hash := uint64(0)

  for x := 0; x < 8; x++ {
    for y := 0; y < 8; y++ {
      if !(x == 0 && y == 0) {
        if dctVals[x][y] > avg {
          hash = hash | (1 << (63-uint64(x*8+y)))
        }
      }
    }
  }

  return hash
}

func applyDCT(f [][]float64, c []float64) [][]float64 {
  F := make([][]float64, 32)
  for i := range F {
    F[i] = make([]float64, 32)
  }

  cosines := [32][32]float64 {}
  for a := 0; a < 32 ; a++ {
    for b := 0; b < 32 ; b++ {
      cosines[a][b] = math.Cos((float64(2*a+1) / float64(2*32)) * float64(b) * math.Pi)
    }
  }

  for u := 0; u < 32; u++ {
    for v := 0; v < 32; v++ {
      sum := 0.0
      for i := 0; i < 32; i++ {
        for j := 0; j < 32; j++ {
          sum += cosines[i][u] * cosines[j][v] * f[i][j]
        }
      }
      sum *= (c[u]*c[v] / 4.0)
      F[u][v] = sum
    }
  }
  return F
}

Jak je vidět, jde o pár řádků kódu a výsledky jsou překvapivě přesné.

(nebo, nebo, nebo)

Přesto nejde o neomylný algoritmus. Kód si pohrává s pravděpodobnostmi a někdy, když se plete, bývají výsledky veselé. Zvlášť, když zvýším toleranci podobnosti.

Teď už zbývá jen vyřešit, jak to dělat rychle, když chci projít v titulku zmíněné dva miliony obrázků pod deset minut.

Samotné porovnání p-hashů se může zdát jako dostatečně rychlé. V ideálním případě jde o pouhé 2 instrukce: xor a popcnt + něco málo okolo.1 Přesto, pokud chci porovnat všechny hashe se všemi ostatními, musím udělat kvadratické množství práce a velké číslo se umocněním na druhou bohužel nezmenší.

Naštěstí můžu všechno znatelně urychlit pomocí postupu známého jako locality-sensitive hashing (LSH) – další z metod, která si hraje s pravděpodobnostmi a obětuje něco málo přesnosti ve prospěch masivního zrychlení.

LSH využívá hashe zachovávající lokalitu (podobné věci mají podobné hashe) a snaží se položky rozdělit do skupin, kdy v každé skupině jsou položky, které si můžou být podobné, ale nejsou tam ty, které jsou očividně rozdílné.

V tomto případě zkonstruuji LSH pro Hammingovu vzdálenost tak, že 64 bitů hashe rozsekám na 8 pruhů (band) po 8 bitech a každý hash přiřadím do skupiny (bucket) určenou bity každého pruhu.

To znamená, že když prvních osm bitů hashe má hodnotu 0xFF, v prvním pruhu ho přiřadím do bucketu 0xFF, když má druhých 8 bitů 0x0A, v druhém pruhu spadne do skupiny 0x0A a tak dále.

Celé to funguje na jednoduchém předpokladu: Pokud jsou si dva hashe velice podobné, s největší pravděpodobností se shodují v bitech jednoho nebo více pruhů.2 Když chci najít všechny hashe podobné hashi A, vezmu jeho prvních 8 bitů, podívám se do odpovídajícího bucketu prvního pruhu a spočítám podobnost jen s těmito kandidáty, pak vezmu dalších osm bitů a spočítám podobnost s těmito kandidáty. To udělám jednou pro každý pruh a mám výsledné podobné hashe. Protože jde o pravděpodobnostní metodu, může se stát, že některé skutečně podobné hashe to přeskočí, protože se vždy o jeden bit lišily ve všech pruzích a proto skončily v odlišných bucketech a vůbec nebyly vybráni jako kandidáti. Pokud se zajímám o velice podobné hashe, je tato šance velice malá. Pokud chci v takto vytvořeném LSH najít hashe lišící se v méně než 8 bitech, je garantováno, že všechny dostanu, protože se aspoň v jednom pruhu musí shodovat.

S LSH, kdy mám 8 pruhů po 8 bitech, mi musím udělat jen 8/256 práce – 32× méně, protože se musím podívat do 8 bucketů (jeden v každém pruhu) a spočítat podobnosti ve všemi kandidáty, kterých je 1/256. Pokud to nestačí, můžu tyto dva parametry zvolit podle toho, jak je třeba: když chci víc přesnosti zvolím menší šířku pruhu (12×5 bitů), pokud chci více rychlosti, zvolím širší pruhy (6×10 bitů).

No a s tímto „zlepšovákem“ dokážu bez větších problémů porovnat ony dva miliony obrázků pod 10 minut.

pattern := "/path/to/images/*"
fs, _ := filepath.Glob(pattern)

files  := make([]string, 0, len(fs))
hashes := make([]uint64, 0, len(fs))

for _, fullPath := range fs {

  reader, err := os.Open(fullPath)
  if err != nil { continue }

  img, _, err := image.Decode(reader)
  if err != nil { continue }

  hash := ImagePHash(img)
  files  = append(files, fullPath)
  hashes = append(hashes, hash)

  reader.Close()
}


// create LSH map

const Bands = 8
const BandBits = 8
const BandMask = (1 << BandBits) - 1

bands := [Bands][BandMask+1][]IdxHashPair {}

for idx, hash := range hashes {
  for b := 0 ; b < Bands ; b++ {
    h := hash << uint32(b*BandBits) & BandMask
    bands[b][h] = append(bands[b][h], IdxHashPair { idx, hash })
  }
}


// compute similarities

for idx, hash := range hashes {
  for b := 0 ; b < Bands ; b++ {
    h := hash << uint32(b*BandBits) & BandMask

    for _, candidate := range bands[b][h] {
      if idx <= candidate.Idx { continue }

      diff := differentBits(hash, candidate.Hash)
      if diff < 6  {
        similarity := 1.0 - (float64(diff) / float64(64))
        fmt.Printf("%s %s %d\n", files[idx], files[candidate.Idx], similarity)
      }
    }
  }
}

To je všechno. Záhada vyřešena.


Protože tuto techniku používám na několika místech (například pro detekci podobných článků na devblozích, tady, na téhle maličkosti a na mnoha dalších místech), tak jsem si proto napsal vlastní knihovnu nazvanou prozaicky sketches, která obsahuje implementaci PHashe, skečů, LSH, ALS, FunkSVD nebo DBSCAN clusterování a dalších nesouvisejících vě­cí.


Dále k tématu:


Pozn:

  1. V Go se bohužel k popcnt bez tanečků v assembleru nedá dostat a tak je třeba si napsat utilitu, která dělá to samé, ale potřebuje o něco víc instrukcí.
  2. Pravděpodobnost, že LSH odhalí dva kandidáty je 1 – (1 – pn)b, kde p je podobnost mezi dvěma kandidáty, n šířka pruhu a b je počet pruhů. Pomocí tohoto vztahu můžu najít hodnoty n a p, které budou fungovat pro požadovaný práh podobnosti.

Flattr this!

Posted in Algo, DS | Leave a comment